Files
Discord-Hermes-3-8B/interface.py
ModelHub XC dcac1c41d3 初始化项目,由ModelHub XC社区提供模型
Model: mookiezii/Discord-Hermes-3-8B
Source: Original Platform
2026-04-18 01:19:21 +08:00

1161 lines
44 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
interface.py - Discord-Hermes Model Chat Interface
Description:
This script provides an interactive CLI for chatting with Hugging Face language models,
designed for Discord-style conversational flow. It supports both Transformers and GGUF
(llama.cpp) models, with optional LoRA adapter loading and bitsandbytes quantization.
Key Features & Quick Controls:
Keyboard Shortcuts:
• Enter → submit current input
• Ctrl+T → insert newline into the current prompt (multi-line messages)
• Ctrl+C → cancel mid-generation or mid-stream and keep partial output
• Ctrl+Z → exit cleanly, freeing VRAM and memory
Commands:
• /clear (/c, /reset, !clear, !c) → reset all context
• /back (/b, !b) → undo last user+assistant exchange and preview history
• /h [N] (!h) → enable history using last N exchanges (default: all)
• /d → disable history
• On-the-fly parameter tuning:
/min N → set min new tokens
/max N → set max new tokens
/temp X → set temperature
/p X → set top-p
/k N → set top-k
• Randomization:
/r → randomize params
/rh → randomize with high variance
• /stop → toggle stopping further extension mid-generation
Generation Flow:
• Extension flow: prefer short replies (minmax tokens) but extend until EOS for natural endings
• Configurable prompt modes (system prompt, assistant prompt, blank mode)
Advanced:
• LoRA stacking: frozen base adapter + active adapter support
• Supports GGUF (llama.cpp) with selectable chat templates (--gguf-chat-format)
• Optional code detection and filtering with auto-reprompt
Arguments:
-m, --model Model path or Hugging Face repo ID (default: mookiezii/Discord-Hermes-3-8B)
-q, --quant Quantization mode: 4 or 8 (default: off). Use `-q` (no value) for 4-bit, or `-q 8` for 8-bit
-fl, --frozen-lora Model path or Hugging Face repo ID of the base LoRa adapter to load and freeze
-c, --checkpoint Model path or Hugging Face repo ID of the LoRa adapter to load
-chs, --checkpoint-subfolder Subfolder of the path or Hugging Face repo ID of the LoRa adapter to load
--deephermes Enable DeepHermes formatting instead of ChatML
--gguf Use GGUF model format with llama.cpp backend
--gguf-chat-format Chat format for GGUF models (default: "chatml")
--blank Raw user input only, no prompts/system context
-asc, --assistant-system-combo Include both system and assistant system prompts
-as, --assistant-system Use assistant system prompt instead of standard
--just-system-prompt Use only the system prompt with user input
--no-system-prompt Do not include system prompt
--no-assistant-prompt Do not include assistant prompt
--code-check Enable code detection and filtering via classifier
-au, --auto Run preset inputs (hello → what do you do → wow tell me more) 5 times with /clear in between, then exit
Usage (quick help):
python interface.py -h
USAGE / RECIPES:
Basic (Transformers, full precision):
python interface.py -m mookiezii/Discord-Hermes-3-8B
Quantization (Transformers):
# 4-bit:
python interface.py -m repo -q
# 8-bit:
python interface.py -m repo -q 8
# full precision:
python interface.py -m repo
GGUF (llama.cpp backend):
python interface.py --gguf -m /path/to/model.gguf --gguf-chat-format chatml
# alternate chat template:
python interface.py --gguf -m /path/to/model.gguf --gguf-chat-format alpaca
LoRA (frozen base + active adapter):
python interface.py -m base/model \
-fl path/to/frozen_base_lora \
-c path/to/active_adapter --checkpoint-subfolder adapter_subdir
Prompt modes:
# Raw user input, no system/assistant prompts:
python interface.py --blank
# Assistant system prompt instead of standard:
python interface.py --assistant-system
# System + assistant system combined:
python interface.py --assistant-system-combo
# Just the system prompt (no assistant prompt preface):
python interface.py --just-system-prompt
# Strip system prompt entirely:
python interface.py --no-system-prompt
# Strip assistant prompt entirely:
python interface.py --no-assistant-prompt
Format toggle:
# Use DeepHermes formatting instead of ChatML:
python interface.py --deephermes
Auto run demo + exit:
python interface.py --auto
"""
# MIT License
#
# Copyright (c) 2025 mookziei
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#!/usr/bin/env python3
import os
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True"
import torch
import gc
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig, pipeline, AutoConfig
from peft import PeftModel
from huggingface_hub import login
import argparse
import logging
import re
from transformers import TextStreamer
from datetime import datetime
from prompt_toolkit import PromptSession
from prompt_toolkit.key_binding import KeyBindings
from huggingface_hub import login
import random
from transformers import LogitsProcessor
from transformers import LogitsProcessorList
import signal
os.environ["HF_HUB_ENABLE_HF_TRANSFER"] = "1"
gc.collect()
torch.cuda.empty_cache()
torch.cuda.reset_peak_memory_stats()
parser = argparse.ArgumentParser(description="HuggingFace Model Chat Interface")
parser.add_argument("-au", "--auto", action="store_true",
help="Run preset inputs (hello → what do you do → wow tell me more) 5 times with /clear in between, then exit")
parser.add_argument("-m", "--model", default="mookiezii/Discord-Hermes-3-8B",
help="Model path or Hugging Face repo ID")
parser.add_argument(
"-q", "--quant",
nargs="?",
choices=("4", "8"),
const="4",
help="Quantization mode: 4 or 8. Omit this flag for full precision. Using `-q` alone selects 4-bit."
)
parser.add_argument("-fl", "--frozen-lora",
help="Model path or Hugging Face repo ID of the base LoRa adapter to load and freeze")
parser.add_argument("-c", "--checkpoint",
help="Model path or Hugging Face repo ID of the LoRa adapter to load")
parser.add_argument("-chs", "--checkpoint-subfolder",
help="Subfolder of the path or Hugging Face repo ID of the LoRa adapter to load")
parser.add_argument("--deephermes", action="store_true",
help="Enable DeepHermes formatting instead of ChatML")
parser.add_argument("--gguf", action="store_true",
help="Use GGUF model format (llama.cpp backend)")
parser.add_argument("--gguf-chat-format", default="chatml",
help='Chat format for GGUF models (default: "chatml")')
parser.add_argument("--blank", action="store_true",
help="Use only raw user input (no prompts/system context)")
parser.add_argument("-asc", "--assistant-system-combo", action="store_true",
help="Include both system and assistant system prompts")
parser.add_argument("-as", "--assistant-system", action="store_true",
help="Use assistant system prompt instead of standard system prompt")
parser.add_argument("--just-system-prompt", action="store_true",
help="Use only the system prompt with user input")
parser.add_argument("--no-system-prompt", action="store_true",
help="Do not include system prompt")
parser.add_argument("--no-assistant-prompt", action="store_true",
help="Do not include assistant prompt")
parser.add_argument("--code-check", action="store_true",
help="Enable code detection and filtering via classifier")
args = parser.parse_args()
# Apply args to variables
DEEPHERMES = args.deephermes
GGUF = args.gguf
GGUF_CHAT_FORMAT = args.gguf_chat_format
BLANK = args.blank
ASSISTANT_SYSTEM_COMBO = args.assistant_system_combo
ASSISTANT_SYSTEM = args.assistant_system
JUST_SYSTEM_PROMPT = args.just_system_prompt
NO_SYSTEM_PROMPT = args.no_system_prompt
NO_ASSISTANT_PROMPT = args.no_assistant_prompt
CODE_CHECK = args.code_check
QUANTIZATION = args.quant or "off"
USE_QUANT = QUANTIZATION in ("4", "8")
# ================================
base_model_name = args.model
FROZEN_LORA_PATH = args.frozen_lora
checkpoint_path = args.checkpoint or ""
checkpoint_subfolder = args.checkpoint_subfolder
USE_BASE_MODEL_ONLY = not (FROZEN_LORA_PATH or checkpoint_path)
# ================================
PLAIN_SYSTEM_PROMPT = """"""
ASSISTANT_SYSTEM_PROMPT = """"""
# ================================
if args.auto:
MIN_NEW_TOKENS = 4
MAX_NEW_TOKENS = 60
TEMPERATURE = 0.6
TOP_P = 0.9
TOP_K = 55
else:
MIN_NEW_TOKENS = random.randint(3, 5)
MAX_NEW_TOKENS = random.randint(40, 75)
TEMPERATURE = random.uniform(0.5, 0.9)
TOP_P = random.uniform(0.7, 0.9)
TOP_K = random.randint(40, 50)
# ================================
STOP_EXTENSION = False
# --- Prefer 4075 tokens, but extend until <|im_end|> (or eos) ---
def generate_prefer_short_then_extend(
model,
tokenizer,
input_ids,
attention_mask=None,
first_min=MIN_NEW_TOKENS,
first_max=MAX_NEW_TOKENS,
extend_step=64,
hard_cap=1024,
**gen_kwargs,
):
import random, torch
# Resolve EOS ids (<|im_end|> for ChatML/Hermes, or tokenizer.eos_token_id)
eos_ids = []
if tokenizer.eos_token_id is not None:
eos_ids.append(tokenizer.eos_token_id)
try:
im_end_id = tokenizer.convert_tokens_to_ids("<|im_end|>")
if im_end_id is not None:
eos_ids.append(im_end_id)
except Exception:
pass
eos_ids = [eid for eid in eos_ids if eid is not None]
if not eos_ids:
raise ValueError("No EOS token id found. Make sure '<|im_end|>' exists or set tokenizer.eos_token.")
# Strip keys we set manually so no duplication happens
for key in ["max_new_tokens", "eos_token_id", "do_sample", "pad_token_id"]:
gen_kwargs.pop(key, None)
start_len = input_ids.shape[1]
seq = input_ids
attn = attention_mask if attention_mask is not None else torch.ones_like(input_ids)
budget = random.randint(first_min, first_max)
while True:
out = model.generate(
input_ids=seq,
attention_mask=attn,
max_new_tokens=budget,
eos_token_id=eos_ids[0],
pad_token_id=tokenizer.pad_token_id or eos_ids[0],
return_dict_in_generate=True,
**gen_kwargs,
)
seq = out.sequences
attn = torch.ones_like(seq)
new_tokens = seq[0, start_len:].tolist()
if any(t in eos_ids for t in new_tokens):
break
if len(new_tokens) >= hard_cap:
break
if globals().get("STOP_EXTENSION", False):
break
budget = extend_step
return seq
if checkpoint_path is None:
USE_BASE_MODEL_ONLY = True
checkpoint_path = ""
else:
USE_BASE_MODEL_ONLY = False
if ASSISTANT_SYSTEM:
if DEEPHERMES:
SYSTEM_PROMPT = f"<|start_header_id|>assistant<|end_header_id|>\n{PLAIN_SYSTEM_PROMPT}"
else:
SYSTEM_PROMPT = f"<|im_start|>assistant\n{PLAIN_SYSTEM_PROMPT}<|im_end|>"
elif ASSISTANT_SYSTEM_COMBO:
if DEEPHERMES:
SYSTEM_PROMPT = f"<|start_header_id|>system<|end_header_id|>\n{PLAIN_SYSTEM_PROMPT}<|start_header_id|>assistant<|end_header_id|>\n{ASSISTANT_SYSTEM_PROMPT}"
else:
SYSTEM_PROMPT = f"<|im_start|>system\n{PLAIN_SYSTEM_PROMPT}<|im_end|>\n<|im_start|>assistant\n{ASSISTANT_SYSTEM_PROMPT}<|im_end|>"
else:
if DEEPHERMES:
SYSTEM_PROMPT = f"<|start_header_id|>system<|end_header_id|>\n{PLAIN_SYSTEM_PROMPT}"
else:
SYSTEM_PROMPT = f"<|im_start|>system\n{PLAIN_SYSTEM_PROMPT}<|im_end|>"
def handle_sigstp(signum, frame):
print("\n\033[1;91m[Received Ctrl+Z — exiting.]\033[0m")
torch.cuda.empty_cache()
exit(0)
signal.signal(signal.SIGTSTP, handle_sigstp)
class RecentTokenBlocker(LogitsProcessor):
def __init__(self, window_size: int, eos_id: int | None = None):
self.window_size = window_size
self.eos_id = eos_id
def __call__(self, input_ids, scores):
# Penalize last N tokens (use a big negative instead of -inf)
recent_tokens = input_ids[0, -self.window_size:].tolist()
if recent_tokens:
for token_id in set(recent_tokens):
if 0 <= token_id < scores.shape[-1]:
scores[0, token_id] = -1e9
# Sanitize logits
scores = torch.nan_to_num(scores, neginf=-1e9)
# If everything is blocked, fallback to EOS
if torch.all(scores[0] <= -9e8):
scores[0].fill_(-1e9)
if self.eos_id is not None and 0 <= self.eos_id < scores.shape[-1]:
scores[0, self.eos_id] = 0
return scores
processor_list = LogitsProcessorList()
processor_list.append(RecentTokenBlocker(window_size=3))
def randomize():
global MIN_NEW_TOKENS, MAX_NEW_TOKENS, TEMPERATURE, TOP_P, TOP_K
MIN_NEW_TOKENS = random.randint(3, 5)
MAX_NEW_TOKENS = random.randint(40, 75)
TEMPERATURE = random.uniform(0.5, 0.9)
TOP_P = random.uniform(0.7, 0.9)
TOP_K = random.randint(40, 75)
return
def randomize_high_variance():
global MIN_NEW_TOKENS, MAX_NEW_TOKENS, TEMPERATURE, TOP_P, TOP_K
MIN_NEW_TOKENS = random.randint(3, 20)
MAX_NEW_TOKENS = random.randint(40, 150)
TEMPERATURE = random.uniform(0.2, 0.9)
TOP_P = random.uniform(0.7, 1)
TOP_K = random.randint(40, 150)
return
logging.getLogger("transformers").setLevel(logging.ERROR)
bnb_config4bit = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_use_double_quant=True,
bnb_4bit_compute_dtype=torch.float16,
bnb_4bit_quant_type="nf4",
)
bnb_config8bit = BitsAndBytesConfig(load_in_8bit=True)
bnb_config = None
if USE_QUANT:
bnb_config = bnb_config4bit if QUANTIZATION == "4" else bnb_config8bit
bindings = KeyBindings()
session = PromptSession(key_bindings=bindings)
@bindings.add('enter')
def _(event):
event.app.exit(result=event.app.current_buffer.text)
@bindings.add('c-t')
def _(event):
event.app.current_buffer.insert_text('\n')
tokenizer = None # define early for fallback
if GGUF:
from llama_cpp import Llama
model = Llama(
model_path=base_model_name,
n_gpu_layers=999, # same as -ngl 999
n_ctx=4096, # same as -c 4096
n_batch=2048, # same as -b 2048
n_threads=12, # same as -t 12
chat_format=GGUF_CHAT_FORMAT,
use_mmap=True, # fastest load
logits_all=False
)
tokenizer = None
else:
tokenizer = AutoTokenizer.from_pretrained(base_model_name)
# ---- normalize config BEFORE ANY model load ----
cfg = AutoConfig.from_pretrained(base_model_name)
if getattr(cfg, "hidden_act", None) == "swiglu":
cfg.hidden_act = "silu"
if USE_BASE_MODEL_ONLY:
if USE_QUANT and bnb_config is not None:
model = AutoModelForCausalLM.from_pretrained(
base_model_name,
config=cfg,
torch_dtype=torch.float16,
quantization_config=bnb_config,
device_map="auto",
)
else:
model = AutoModelForCausalLM.from_pretrained(
base_model_name,
config=cfg,
torch_dtype=torch.float16,
device_map="auto",
)
model.resize_token_embeddings(len(tokenizer))
else:
# Load base first (for LoRA stack)
if USE_QUANT and bnb_config is not None:
base_model = AutoModelForCausalLM.from_pretrained(
base_model_name,
config=cfg,
torch_dtype=torch.float16,
quantization_config=bnb_config,
device_map="auto",
)
else:
base_model = AutoModelForCausalLM.from_pretrained(
base_model_name,
config=cfg,
torch_dtype=torch.float16,
device_map="auto",
)
base_model.resize_token_embeddings(len(tokenizer))
# Load frozen adapter first (if present)
if FROZEN_LORA_PATH:
base_model = PeftModel.from_pretrained(
base_model,
FROZEN_LORA_PATH,
is_trainable=False
)
print(f"Loaded frozen LoRA adapter from {FROZEN_LORA_PATH}")
# Active adapter (if provided)
if checkpoint_path:
model = PeftModel.from_pretrained(
base_model,
checkpoint_path,
subfolder=checkpoint_subfolder if checkpoint_subfolder else None,
)
else:
model = base_model
class CyanPromptStreamer(TextStreamer):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.first = True
self.output = ""
self._buffer = []
# Receive raw, possibly-unstable pieces → just buffer
def on_text(self, text, **kwargs):
self._buffer.append(text)
# Receive stable text spans → clean once, then print
def on_finalized_text(self, text, **kwargs):
text = "".join(self._buffer) + text
self._buffer.clear()
text = strip_begin_of_text(text)
text = clean_special_tokens(text)
#text = filter_emojis_keep_first_single(text)
#text = remove_all_emojis(text)
self.output += text
if self.first:
#print("\033[1;96m> \033[0m", end="", flush=True)
self.first = False
print(text, end="", flush=True)
def on_final_text(self, text, **kwargs):
# final flush if anything remains
if self._buffer:
self.on_finalized_text("", **kwargs)
self.output += text
print(text, end="", flush=True)
def end(self):
super().end()
self.first = True
def filter_emojis_keep_first_single(text):
# Emoji regex pattern (single emojis)
emoji_pattern = re.compile(
"["
"\U0001F600-\U0001F64F"
"\U0001F300-\U0001F5FF"
"\U0001F680-\U0001F6FF"
"\U0001F1E0-\U0001F1FF"
"\U00002702-\U000027B0"
"\U000024C2-\U0001F251"
"\U0001F900-\U0001F9FF"
"\U0001FA70-\U0001FAFF"
"\U0001F018-\U0001F270"
"\U0001F650-\U0001F67F"
"\U00002600-\U000026FF"
"\U00002300-\U000023FF"
"]+",
flags=re.UNICODE
)
# Pattern for emoji groups: 2 or more consecutive emojis
emoji_group_pattern = re.compile(r'(' + emoji_pattern.pattern + r'){2,}', flags=re.UNICODE)
# Remove all emoji groups first (clusters of 2+ emojis)
text = emoji_group_pattern.sub("", text)
# Now remove all single emojis except the first one
first_emoji_found = False
def replace_single_emoji(match):
nonlocal first_emoji_found
if not first_emoji_found:
first_emoji_found = True
return match.group(0) # keep first emoji
return "" # remove all others
filtered_text = emoji_pattern.sub(replace_single_emoji, text)
return filtered_text
def remove_all_emojis(text):
emoji_pattern = re.compile(
"["
"\U0001F600-\U0001F64F"
"\U0001F300-\U0001F5FF"
"\U0001F680-\U0001F6FF"
"\U0001F1E0-\U0001F1FF"
"\U00002702-\U000027B0"
"\U000024C2-\U0001F251"
"\U0001F900-\U0001F9FF"
"\U0001FA70-\U0001FAFF"
"\U0001F018-\U0001F270"
"\U0001F650-\U0001F67F"
"\U00002600-\U000026FF"
"\U00002300-\U000023FF"
"]+",
flags=re.UNICODE
)
return emoji_pattern.sub(r'.', text)
def show_params():
print(
f"\033[1;92m[Params]"
f" min={MIN_NEW_TOKENS} | max={MAX_NEW_TOKENS} |"
f" temp={TEMPERATURE:.2f} | p={TOP_P:.2f} | k={TOP_K}\033[0m"
)
if CODE_CHECK:
classifier = pipeline("zero-shot-classification", model="facebook/bart-large-mnli")
if checkpoint_path:
folder_name = f"{base_model_name.replace('/', '_')}_{os.path.basename(checkpoint_path).replace('/', '_')}"
else:
folder_name = base_model_name.replace('/', '_')
output_dir = os.path.join(os.getcwd(), "interface_output", folder_name)
os.makedirs(output_dir, exist_ok=True)
base_filename = f"{folder_name}.txt"
output_path = os.path.join(output_dir, base_filename)
if args.auto:
base_filename = "auto.txt"
output_path = os.path.join(output_dir, base_filename)
else:
base_filename = f"{folder_name}.txt"
output_path = os.path.join(output_dir, base_filename)
if os.path.exists(output_path):
i = 1
while True:
numbered = os.path.join(output_dir, f"{folder_name}_{i}.txt")
if not os.path.exists(numbered):
output_path = numbered
break
i += 1
if tokenizer:
if DEEPHERMES:
reserved_special_ids = [
tid for tok, tid in tokenizer.get_vocab().items()
if re.match(r"<\|reserved_special_token_\d+\|>", tok)
]
im_end_token_id = [tokenizer.convert_tokens_to_ids("<|eot_id|>")] + reserved_special_ids
else:
im_end_token_id = tokenizer.convert_tokens_to_ids("<|im_end|>")
else:
im_end_token_id = None
def get_generation_params():
return {
"do_sample": True,
"eos_token_id": (
([im_end_token_id] if isinstance(im_end_token_id, int) else (im_end_token_id or []))
+ ([tokenizer.eos_token_id] if tokenizer and tokenizer.eos_token_id is not None else [])
) or None,
"bos_token_id": tokenizer.bos_token_id if tokenizer else None,
"min_new_tokens": MIN_NEW_TOKENS,
"max_new_tokens": MAX_NEW_TOKENS,
"temperature": TEMPERATURE,
"top_p": TOP_P,
"top_k": TOP_K,
"no_repeat_ngram_size": 3,
"repetition_penalty": 1.2,
#"pad_token_id": tokenizer.eos_token_id if tokenizer else None,
"logits_processor": processor_list,
}
chat_history = []
def cleanup_and_exit():
print("\n\033[1;91m[Cleaning up — freeing VRAM]\033[0m")
gc.collect()
torch.cuda.empty_cache()
torch.cuda.reset_peak_memory_stats()
raise SystemExit(0)
def log(text):
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(colorize(text))
with open(output_path, "a", encoding="utf-8") as f:
f.write(f"[{now}]\n{text}\n\n────────────────────────────────────────────────────────────────────────────\n\n")
def justlog(text):
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with open(output_path, "a", encoding="utf-8") as f:
f.write(f"[{now}]\n{text}\n\n────────────────────────────────────────────────────────────────────────────\n\n")
def clean_special_tokens(text: str) -> str:
text = re.sub(r"<\|im_start\|>", "", text)
text = re.sub(r"<\|im_end\|>", "", text)
text = re.sub(r"<\|eot_id\|>", "", text)
text = re.sub(r"<\|reserved_special_token_\d+\|>", "", text)
text = re.sub(r"\n+\s*(<\|start_header_id\|>assistant<\|end_header_id\|>)", r"\n\1", text)
return text
def count_tokens(s):
if tokenizer:
return len(tokenizer.encode(s))
else:
return len(s.split()) # fallback for GGUF
def colorize(text):
text = text.replace("<|im_start|>system", "\033[1;93m<|im_start|>system\033[0m")
text = text.replace("<|im_start|>user", "\033[1;96m<|im_start|>user\033[0m")
text = text.replace("<|im_start|>assistant", "\033[1;95m<|im_start|>assistant\033[0m")
text = text.replace("<|im_end|>", "\033[1;1;90m<|im_end|>\033[0m")
return text
def extract_assistant_reply(full_text):
if DEEPHERMES:
start_token = "<|start_header_id|>assistant<|end_header_id|>"
start_idx = full_text.rfind(start_token)
if start_idx == -1:
return ""
start_idx += len(start_token)
# Find the earliest end among <|eot_id|> or any <|reserved_special_token_N|>
end_pattern = re.compile(r"<\|eot_id\|>|<\|reserved_special_token_\d+\|>")
m = end_pattern.search(full_text, start_idx)
end_idx = m.start() if m else len(full_text) # ← fallback to EOF
else:
start_token = "<|im_start|>assistant"
end_token = "<|im_end|>"
start_idx = full_text.rfind(start_token)
if start_idx == -1:
return ""
start_idx += len(start_token)
end_idx = full_text.find(end_token, start_idx)
if end_idx == -1:
end_idx = len(full_text) # ← fallback to EOF
return full_text[start_idx:end_idx].strip()
def lowercase_lines_and_sentences(text: str) -> str:
def should_skip(word: str) -> bool:
return len(word) >= 2 and word.isupper()
# Lowercase start of every line (unless all caps and >= 2 chars)
def lower_line_start(line: str) -> str:
if not line.strip(): # covers empty or whitespace-only
return ''
parts = line.split(maxsplit=1)
first_word = parts[0]
if should_skip(first_word):
return line
# Rebuild line if split > 1, else just lower first char
if len(parts) > 1:
rest = parts[1]
return first_word[:1].lower() + first_word[1:] + ' ' + rest
return line[:1].lower() + line[1:]
# Lowercase start of every sentence after . ? !
def lower_after_punct(match):
punct_space = match.group(1)
rest = match.group(2)
first_word = rest.split()[0]
if should_skip(first_word):
return punct_space + rest
return punct_space + rest[:1].lower() + rest[1:]
return re.sub(r'([.?!]\s+)(\w+)', lower_after_punct, text)
def strip_begin_of_text(s):
prefix = "<|begin_of_text|>"
if s.startswith(prefix):
return s[len(prefix):]
return s
def strip_code_word(text: str) -> str:
import re
words_to_remove = [
'api', 'program', 'programmer', 'script', 'scripts', 'code', 'coded', 'codeblock', 'coding', 'log', 'logs', 'snippet', 'function',
'script', 'algorithm', 'web', 'library', 'python', 'bot' , 'nodejs'
]
pattern = r'\b(?:' + '|'.join(map(re.escape, words_to_remove)) + r')\b'
return re.sub(pattern, '', text, flags=re.IGNORECASE)
def classify(text, threshold=0.8):
if not text.strip():
return False, {}
candidate_labels = ["code", "chat"]
result = classifier(text, candidate_labels)
scores = dict(zip(result['labels'], result['scores']))
is_code = scores.get("code", 0) > scores.get("chat", 0) and scores["code"] > threshold
return is_code, scores
# Initialize dummy values for current_prompt and user_input for the first generation_params
current_prompt = ""
user_input = ""
generation_params = get_generation_params()
if args.auto:
with open(output_path, "a", encoding="utf-8") as f:
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
f.write(f"[{now}]")
f.write(f"# model: {base_model_name}\n")
f.write(f"# frozen-lora: {FROZEN_LORA_PATH or 'None'}\n")
f.write(f"# checkpoint: {checkpoint_path or 'None'}\n")
f.write(f"# args: {vars(args)}\n")
f.write(f"# generation_params: {generation_params}\n\n────────────────────────────────────────────────────────────────────────────\n\n")
else:
with open(output_path, "w", encoding="utf-8") as f:
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
f.write(f"[{now}]")
f.write(f"# model: {base_model_name}\n")
f.write(f"# frozen-lora: {FROZEN_LORA_PATH or 'None'}\n")
f.write(f"# checkpoint: {checkpoint_path or 'None'}\n")
f.write(f"# args: {vars(args)}\n")
f.write(f"# generation_params: {generation_params}\n\n────────────────────────────────────────────────────────────────────────────\n\n")
max_retries = 5
history_enabled = True
cot_max_exchanges = 1000
cached_history_size = 1000
if not GGUF:
model.eval()
aa = getattr(model, "active_adapter", None)
if aa is not None:
print(aa)
else:
print(f"🚀 Model launched ({base_model_name})")
turn = 0
if args.auto:
cycle = ["hello", "what do you do", "wow tell me more", "/clear"]
auto_inputs = cycle * 5 # repeat cycle 3 times
else:
auto_inputs = []
#=============
# START LOOP
#=============
while True:
generation_params = get_generation_params()
show_params()
# print("\033[1;96m> ", end="", flush=True)
if auto_inputs:
user_input = auto_inputs.pop(0)
print(f"\033[1;96m> {user_input}\033[0m") # show it like a typed input
else:
user_input = session.prompt(multiline=True)
user_input = user_input.strip()
cmd = user_input.lower()
# --- clear/reset first (includes /c and !c aliases) ---
if cmd in ["/clear", "!clear", "/reset", "!reset", "/c", "!c"]:
chat_history.clear()
gc.collect()
torch.cuda.empty_cache()
print("\033[1;93m[Chat history cleared.]\033[0m")
justlog("[Chat history cleared.]")
# If we're in auto mode and this was the final queued item, exit now.
if args.auto and not auto_inputs:
print("\n\033[1;92m[Auto mode finished — exiting.]\033[0m")
cleanup_and_exit()
continue
# --- /back (/b): undo last exchange and reprint tail ---
if cmd in ["/back", "!back", "/b", "!b"]:
if chat_history:
u, a = chat_history.pop()
turn = max(0, turn - 1)
print("\033[1;93m[Undo] Removed last user+assistant exchange.\033[0m")
justlog("[Undo] Removed last user+assistant exchange.]")
TAIL = 3 # how many exchanges to preview
tail = chat_history[-TAIL:]
if tail:
if DEEPHERMES:
preview = "\n".join(
f"<|start_header_id|>user<|end_header_id|>\n{uu}\n"
f"<|start_header_id|>assistant<|end_header_id|>\n{aa}<|eot_id|>"
for uu, aa in tail
)
else:
preview = "\n".join(
f"<|im_start|>user\n{uu}<|im_end|>\n"
f"<|im_start|>assistant\n{aa}<|im_end|>"
for uu, aa in tail
)
print(colorize(preview))
else:
print("\033[1;90m[Chat history is now empty.]\033[0m")
else:
print("\033[1;90m[Nothing to undo.]\033[0m")
continue
# --- /h (CoT on, optional count) ---
m = re.match(r"^(/h|!h)(\s+(\d+))?$", cmd)
if m:
history_enabled = True
cot_max_exchanges = int(m.group(3)) if m.group(3) else len(chat_history)
print(f"\033[1;94m[History] Using last {cot_max_exchanges} exchanges.]\033[0m")
justlog(f"[Chat history] Using last {cot_max_exchanges} exchanges.]")
continue
# --- /d (CoT off) ---
if cmd in ["/d", "!d"]:
history_enabled = False
cot_max_exchanges = 0
print("\033[1;94m[Chat history disabled.]\033[0m")
justlog("[Chat history disabled.]")
continue
# === Multi-param updates: allow "/k 40 /t 1 /p .7 /min 1 /max 50" in one line ===
def apply_param_change(kind: str, val_s: str | None) -> None:
"""
Return: None. Applies a single param change (or toggle) in-place, printing the outcome.
Logic: Parses kind/value, clamps where needed, updates globals (min/max/temp/p/k or r/rh/stop).
Allowances: Accepts floats like '.7'; /r, /rh, /stop take no value; whitespace is allowed.
"""
global MIN_NEW_TOKENS, MAX_NEW_TOKENS, TEMPERATURE, TOP_P, TOP_K, STOP_EXTENSION
if kind in ("r", "rh", "stop"):
if kind == "r":
randomize()
print("\033[1;96m[Randomized Parameters]\033[0m")
elif kind == "rh":
randomize_high_variance()
print("\033[1;96m[Randomized Parameters: High Variance]\033[0m")
else:
STOP_EXTENSION = not STOP_EXTENSION
print(f"\033[1;94m[Stop extension {'ENABLED' if STOP_EXTENSION else 'DISABLED'}]\033[0m")
return
if val_s is None or not val_s.strip():
# No numeric given → just show current params
show_params()
return
# Normalize numbers like ".7" to "0.7"
if val_s.startswith("."):
val_s = "0" + val_s
try:
num = float(val_s)
except ValueError:
print("\033[1;91m[Error] Invalid number.\033[0m")
return
if kind == "min":
MIN_NEW_TOKENS = max(0, int(num))
if MAX_NEW_TOKENS < MIN_NEW_TOKENS:
MAX_NEW_TOKENS = MIN_NEW_TOKENS
print(f"\033[1;96m[min → {MIN_NEW_TOKENS}]\033[0m")
elif kind == "max":
MAX_NEW_TOKENS = max(1, int(num))
if MIN_NEW_TOKENS > MAX_NEW_TOKENS:
MIN_NEW_TOKENS = MAX_NEW_TOKENS
print(f"\033[1;96m[max → {MAX_NEW_TOKENS}]\033[0m")
elif kind in ("temp", "t"):
TEMPERATURE = max(0.01, min(float(num), 2.0))
print(f"\033[1;96m[temp → {TEMPERATURE:.2f}]\033[0m")
elif kind == "p":
TOP_P = max(0.05, min(float(num), 1.0))
print(f"\033[1;96m[p → {TOP_P:.2f}]\033[0m")
elif kind == "k":
TOP_K = max(0, int(num))
print(f"\033[1;96m[k → {TOP_K}]\033[0m")
def parse_and_apply_multi_params(cmd: str) -> bool:
"""
Return: True if at least one /param token was found and applied, else False.
Logic: Scans the whole line for repeated tokens /(min|max|temp|t|p|k|r|rh|stop) with optional numbers; applies in order.
Allowances: Free spacing; supports floats like '.7'; mixed toggles (/r /rh /stop) with numeric params in one command.
"""
# find all occurrences anywhere in the line
pattern = re.compile(
r'/(min|max|temp|t|p|k|r|rh|stop)(?:\s+([+-]?(?:\d+(?:\.\d+)?|\.\d+)))?',
flags=re.IGNORECASE
)
found = False
for kind, val in pattern.findall(cmd):
found = True
apply_param_change(kind.lower(), val if val else None)
return found
if parse_and_apply_multi_params(cmd):
generation_params = get_generation_params()
continue
limited_history_for_prompt = chat_history[-cached_history_size:]
if history_enabled and cot_max_exchanges > 0:
limited_history = limited_history_for_prompt[-cot_max_exchanges:]
if DEEPHERMES:
context = "\n".join(
f"<|start_header_id|>user<|end_header_id|>\n{u}\n<|start_header_id|>assistant<|end_header_id|>\n{a}"
for u, a in limited_history
) + "\n"
else:
context = "\n".join(
f"<|im_start|>user\n{u}<|im_end|>\n<|im_start|>assistant\n{a}<|im_end|>"
for u, a in limited_history
) + "\n"
else:
context = ""
if user_input.startswith("<|im_start|>"):
current_prompt = f"{user_input}\n"
elif DEEPHERMES:
current_prompt = f"{SYSTEM_PROMPT}{context}<|start_header_id|>user<|end_header_id|>\n{user_input}\n<|start_header_id|>assistant<|end_header_id|>\n"
else:
if turn == 0:
current_prompt = f"{SYSTEM_PROMPT}{context}<|im_start|>user\n{user_input}<|im_end|>\n<|im_start|>assistant\n"
else:
current_prompt = f"{SYSTEM_PROMPT}\n{context}<|im_start|>user\n{user_input}<|im_end|>\n<|im_start|>assistant\n"
if BLANK:
current_prompt = user_input
if JUST_SYSTEM_PROMPT:
if DEEPHERMES:
current_prompt = f"{SYSTEM_PROMPT}{user_input}"
else:
current_prompt = f"{SYSTEM_PROMPT}\n{user_input}"
if NO_SYSTEM_PROMPT:
if DEEPHERMES:
current_prompt = f"{context}<|start_header_id|>user<|end_header_id|>\n{user_input}\n<|start_header_id|>assistant<|end_header_id|>\n"
else:
current_prompt = f"{context}<|im_start|>user\n{user_input}<|im_end|>\n<|im_start|>assistant\n"
if NO_ASSISTANT_PROMPT:
if DEEPHERMES:
current_prompt = f"{SYSTEM_PROMPT}{context}<|start_header_id|>user<|end_header_id|>\n{user_input}"
else:
current_prompt = f"{SYSTEM_PROMPT}\n{context}<|im_start|>user\n{user_input}"
while count_tokens(current_prompt) > 3000 and chat_history:
chat_history.pop(0)
limited_history_for_prompt = chat_history[-cached_history_size:]
if history_enabled and cot_max_exchanges > 0:
limited_history = limited_history_for_prompt[-cot_max_exchanges:]
context = "\n".join(
f"<|im_start|>user\n{u}<|im_end|>\n<|im_start|>assistant\n{a}<|im_end|>"
for u, a in limited_history
) + "\n"
else:
context = ""
current_prompt = f"{SYSTEM_PROMPT}\n{context}<|im_start|>user\n{user_input}<|im_end|>\n<|im_start|>assistant"
if CODE_CHECK:
retry_count = 0
reply = ""
while retry_count < max_retries:
if GGUF:
output_stream = model.create_chat_completion(
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": user_input}
],
temperature=TEMPERATURE,
top_p=TOP_P,
top_k=TOP_K,
max_tokens=MAX_NEW_TOKENS,
stream=True
)
text = ""
for chunk in output_stream:
delta = chunk["choices"][0]["delta"].get("content", "")
print(delta, end="", flush=True)
text += delta
else:
inputs = tokenizer(current_prompt, return_tensors="pt").to(model.device)
streamer = CyanPromptStreamer(tokenizer, skip_prompt=True, skip_special_tokens=False)
with torch.no_grad():
output_ids = model.generate(
**inputs,
**generation_params,
streamer=streamer
)
text = tokenizer.decode(output_ids[0], skip_special_tokens=False)
reply = extract_assistant_reply(text)
stripedreply = strip_code_word(reply)
text = strip_begin_of_text(text)
if not BLANK:
is_code, scores = classify(stripedreply)
if is_code:
retry_count += 1
log(f"\033[1;91mClassifier scores: code={scores.get('code', 0):.3f}, chat={scores.get('chat', 0):.3f}\033[0m")
log(f"\033[1;91mAttempt {retry_count} reply:\n{text}\n\033[0m")
log(f"\033[1;91mAttempt {retry_count} stripped reply:\n{reply}\n\033[0m")
log(f"\033[1;92;102m[Trying to code. Reprompting... {retry_count}/{max_retries}]\033[0m")
with open(output_path, "a", encoding="utf-8") as f:
f.write(f"[Trying to code. Reprompting... {retry_count}/{max_retries}]\n")
else:
break
final_check, final_scores = classify(text)
if retry_count == max_retries and final_check:
log(f"\033[1;91mFinal classifier scores: code={final_scores.get('code', 0):.3f}, chat={final_scores.get('chat', 0):.3f}\033[0m")
polite_message = "Sorry, Im not able to provide code snippets."
log(polite_message)
with open(output_path, "a", encoding="utf-8") as f:
f.write(f"{polite_message}\n")
if history_enabled:
chat_history.append((user_input, polite_message))
else:
if GGUF:
output_stream = model.create_chat_completion(
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": user_input}
],
temperature=TEMPERATURE,
top_p=TOP_P,
top_k=TOP_K,
max_tokens=MAX_NEW_TOKENS,
stream=True # Enable streaming
)
text = ""
for chunk in output_stream:
delta = chunk["choices"][0]["delta"].get("content", "")
print(delta, end="", flush=True) # Optional: stream live to console
text += delta
else:
inputs = tokenizer(current_prompt, return_tensors="pt").to(model.device)
streamer = CyanPromptStreamer(tokenizer, skip_prompt=True, skip_special_tokens=True, clean_up_tokenization_spaces=True)
with torch.no_grad():
try:
print("\033[1;35m> \033[0m ", end="", flush=True)
output_ids = generate_prefer_short_then_extend(
model, tokenizer,
**inputs,
**generation_params,
first_min=MIN_NEW_TOKENS,
first_max=MAX_NEW_TOKENS,
extend_step=64,
hard_cap=1024,
streamer=streamer
)
except KeyboardInterrupt:
streamer.stopped = True
print("\n\033[1;91m[Generation interrupted by Ctrl+C — using partial output.]\033[0m")
text = tokenizer.decode(output_ids[0], skip_special_tokens=False)
originalreply = extract_assistant_reply(text)
text = lowercase_lines_and_sentences(text)
reply = lowercase_lines_and_sentences(originalreply)
stripedreply = strip_code_word(reply)
text = strip_begin_of_text(text)
logtext = text
text = colorize(text)
consoletext = clean_special_tokens(text)
#text = filter_emojis_keep_first_single(text)
#text = remove_all_emojis(text)
print(f"\n{consoletext}")
justlog(logtext)
if history_enabled:
if not GGUF:
chat_history.append((user_input, reply if reply.strip() else streamer.output.strip()))
else:
chat_history.append((user_input, reply.strip()))
turn += 1
if args.auto and not auto_inputs:
print("\n\033[1;92m[Auto mode finished — exiting.]\033[0m")
break