Files
Discord-Hermes-3-8B/interface.py

1161 lines
44 KiB
Python
Raw Normal View History

"""
interface.py - Discord-Hermes Model Chat Interface
Description:
This script provides an interactive CLI for chatting with Hugging Face language models,
designed for Discord-style conversational flow. It supports both Transformers and GGUF
(llama.cpp) models, with optional LoRA adapter loading and bitsandbytes quantization.
Key Features & Quick Controls:
Keyboard Shortcuts:
Enter submit current input
Ctrl+T insert newline into the current prompt (multi-line messages)
Ctrl+C cancel mid-generation or mid-stream and keep partial output
Ctrl+Z exit cleanly, freeing VRAM and memory
Commands:
/clear (/c, /reset, !clear, !c) reset all context
/back (/b, !b) undo last user+assistant exchange and preview history
/h [N] (!h) enable history using last N exchanges (default: all)
/d disable history
On-the-fly parameter tuning:
/min N set min new tokens
/max N set max new tokens
/temp X set temperature
/p X set top-p
/k N set top-k
Randomization:
/r randomize params
/rh randomize with high variance
/stop toggle stopping further extension mid-generation
Generation Flow:
Extension flow: prefer short replies (minmax tokens) but extend until EOS for natural endings
Configurable prompt modes (system prompt, assistant prompt, blank mode)
Advanced:
LoRA stacking: frozen base adapter + active adapter support
Supports GGUF (llama.cpp) with selectable chat templates (--gguf-chat-format)
Optional code detection and filtering with auto-reprompt
Arguments:
-m, --model Model path or Hugging Face repo ID (default: mookiezii/Discord-Hermes-3-8B)
-q, --quant Quantization mode: 4 or 8 (default: off). Use `-q` (no value) for 4-bit, or `-q 8` for 8-bit
-fl, --frozen-lora Model path or Hugging Face repo ID of the base LoRa adapter to load and freeze
-c, --checkpoint Model path or Hugging Face repo ID of the LoRa adapter to load
-chs, --checkpoint-subfolder Subfolder of the path or Hugging Face repo ID of the LoRa adapter to load
--deephermes Enable DeepHermes formatting instead of ChatML
--gguf Use GGUF model format with llama.cpp backend
--gguf-chat-format Chat format for GGUF models (default: "chatml")
--blank Raw user input only, no prompts/system context
-asc, --assistant-system-combo Include both system and assistant system prompts
-as, --assistant-system Use assistant system prompt instead of standard
--just-system-prompt Use only the system prompt with user input
--no-system-prompt Do not include system prompt
--no-assistant-prompt Do not include assistant prompt
--code-check Enable code detection and filtering via classifier
-au, --auto Run preset inputs (hello what do you do wow tell me more) 5 times with /clear in between, then exit
Usage (quick help):
python interface.py -h
USAGE / RECIPES:
Basic (Transformers, full precision):
python interface.py -m mookiezii/Discord-Hermes-3-8B
Quantization (Transformers):
# 4-bit:
python interface.py -m repo -q
# 8-bit:
python interface.py -m repo -q 8
# full precision:
python interface.py -m repo
GGUF (llama.cpp backend):
python interface.py --gguf -m /path/to/model.gguf --gguf-chat-format chatml
# alternate chat template:
python interface.py --gguf -m /path/to/model.gguf --gguf-chat-format alpaca
LoRA (frozen base + active adapter):
python interface.py -m base/model \
-fl path/to/frozen_base_lora \
-c path/to/active_adapter --checkpoint-subfolder adapter_subdir
Prompt modes:
# Raw user input, no system/assistant prompts:
python interface.py --blank
# Assistant system prompt instead of standard:
python interface.py --assistant-system
# System + assistant system combined:
python interface.py --assistant-system-combo
# Just the system prompt (no assistant prompt preface):
python interface.py --just-system-prompt
# Strip system prompt entirely:
python interface.py --no-system-prompt
# Strip assistant prompt entirely:
python interface.py --no-assistant-prompt
Format toggle:
# Use DeepHermes formatting instead of ChatML:
python interface.py --deephermes
Auto run demo + exit:
python interface.py --auto
"""
# MIT License
#
# Copyright (c) 2025 mookziei
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#!/usr/bin/env python3
import os
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True"
import torch
import gc
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig, pipeline, AutoConfig
from peft import PeftModel
from huggingface_hub import login
import argparse
import logging
import re
from transformers import TextStreamer
from datetime import datetime
from prompt_toolkit import PromptSession
from prompt_toolkit.key_binding import KeyBindings
from huggingface_hub import login
import random
from transformers import LogitsProcessor
from transformers import LogitsProcessorList
import signal
os.environ["HF_HUB_ENABLE_HF_TRANSFER"] = "1"
gc.collect()
torch.cuda.empty_cache()
torch.cuda.reset_peak_memory_stats()
parser = argparse.ArgumentParser(description="HuggingFace Model Chat Interface")
parser.add_argument("-au", "--auto", action="store_true",
help="Run preset inputs (hello → what do you do → wow tell me more) 5 times with /clear in between, then exit")
parser.add_argument("-m", "--model", default="mookiezii/Discord-Hermes-3-8B",
help="Model path or Hugging Face repo ID")
parser.add_argument(
"-q", "--quant",
nargs="?",
choices=("4", "8"),
const="4",
help="Quantization mode: 4 or 8. Omit this flag for full precision. Using `-q` alone selects 4-bit."
)
parser.add_argument("-fl", "--frozen-lora",
help="Model path or Hugging Face repo ID of the base LoRa adapter to load and freeze")
parser.add_argument("-c", "--checkpoint",
help="Model path or Hugging Face repo ID of the LoRa adapter to load")
parser.add_argument("-chs", "--checkpoint-subfolder",
help="Subfolder of the path or Hugging Face repo ID of the LoRa adapter to load")
parser.add_argument("--deephermes", action="store_true",
help="Enable DeepHermes formatting instead of ChatML")
parser.add_argument("--gguf", action="store_true",
help="Use GGUF model format (llama.cpp backend)")
parser.add_argument("--gguf-chat-format", default="chatml",
help='Chat format for GGUF models (default: "chatml")')
parser.add_argument("--blank", action="store_true",
help="Use only raw user input (no prompts/system context)")
parser.add_argument("-asc", "--assistant-system-combo", action="store_true",
help="Include both system and assistant system prompts")
parser.add_argument("-as", "--assistant-system", action="store_true",
help="Use assistant system prompt instead of standard system prompt")
parser.add_argument("--just-system-prompt", action="store_true",
help="Use only the system prompt with user input")
parser.add_argument("--no-system-prompt", action="store_true",
help="Do not include system prompt")
parser.add_argument("--no-assistant-prompt", action="store_true",
help="Do not include assistant prompt")
parser.add_argument("--code-check", action="store_true",
help="Enable code detection and filtering via classifier")
args = parser.parse_args()
# Apply args to variables
DEEPHERMES = args.deephermes
GGUF = args.gguf
GGUF_CHAT_FORMAT = args.gguf_chat_format
BLANK = args.blank
ASSISTANT_SYSTEM_COMBO = args.assistant_system_combo
ASSISTANT_SYSTEM = args.assistant_system
JUST_SYSTEM_PROMPT = args.just_system_prompt
NO_SYSTEM_PROMPT = args.no_system_prompt
NO_ASSISTANT_PROMPT = args.no_assistant_prompt
CODE_CHECK = args.code_check
QUANTIZATION = args.quant or "off"
USE_QUANT = QUANTIZATION in ("4", "8")
# ================================
base_model_name = args.model
FROZEN_LORA_PATH = args.frozen_lora
checkpoint_path = args.checkpoint or ""
checkpoint_subfolder = args.checkpoint_subfolder
USE_BASE_MODEL_ONLY = not (FROZEN_LORA_PATH or checkpoint_path)
# ================================
PLAIN_SYSTEM_PROMPT = """"""
ASSISTANT_SYSTEM_PROMPT = """"""
# ================================
if args.auto:
MIN_NEW_TOKENS = 4
MAX_NEW_TOKENS = 60
TEMPERATURE = 0.6
TOP_P = 0.9
TOP_K = 55
else:
MIN_NEW_TOKENS = random.randint(3, 5)
MAX_NEW_TOKENS = random.randint(40, 75)
TEMPERATURE = random.uniform(0.5, 0.9)
TOP_P = random.uniform(0.7, 0.9)
TOP_K = random.randint(40, 50)
# ================================
STOP_EXTENSION = False
# --- Prefer 4075 tokens, but extend until <|im_end|> (or eos) ---
def generate_prefer_short_then_extend(
model,
tokenizer,
input_ids,
attention_mask=None,
first_min=MIN_NEW_TOKENS,
first_max=MAX_NEW_TOKENS,
extend_step=64,
hard_cap=1024,
**gen_kwargs,
):
import random, torch
# Resolve EOS ids (<|im_end|> for ChatML/Hermes, or tokenizer.eos_token_id)
eos_ids = []
if tokenizer.eos_token_id is not None:
eos_ids.append(tokenizer.eos_token_id)
try:
im_end_id = tokenizer.convert_tokens_to_ids("<|im_end|>")
if im_end_id is not None:
eos_ids.append(im_end_id)
except Exception:
pass
eos_ids = [eid for eid in eos_ids if eid is not None]
if not eos_ids:
raise ValueError("No EOS token id found. Make sure '<|im_end|>' exists or set tokenizer.eos_token.")
# Strip keys we set manually so no duplication happens
for key in ["max_new_tokens", "eos_token_id", "do_sample", "pad_token_id"]:
gen_kwargs.pop(key, None)
start_len = input_ids.shape[1]
seq = input_ids
attn = attention_mask if attention_mask is not None else torch.ones_like(input_ids)
budget = random.randint(first_min, first_max)
while True:
out = model.generate(
input_ids=seq,
attention_mask=attn,
max_new_tokens=budget,
eos_token_id=eos_ids[0],
pad_token_id=tokenizer.pad_token_id or eos_ids[0],
return_dict_in_generate=True,
**gen_kwargs,
)
seq = out.sequences
attn = torch.ones_like(seq)
new_tokens = seq[0, start_len:].tolist()
if any(t in eos_ids for t in new_tokens):
break
if len(new_tokens) >= hard_cap:
break
if globals().get("STOP_EXTENSION", False):
break
budget = extend_step
return seq
if checkpoint_path is None:
USE_BASE_MODEL_ONLY = True
checkpoint_path = ""
else:
USE_BASE_MODEL_ONLY = False
if ASSISTANT_SYSTEM:
if DEEPHERMES:
SYSTEM_PROMPT = f"<|start_header_id|>assistant<|end_header_id|>\n{PLAIN_SYSTEM_PROMPT}"
else:
SYSTEM_PROMPT = f"<|im_start|>assistant\n{PLAIN_SYSTEM_PROMPT}<|im_end|>"
elif ASSISTANT_SYSTEM_COMBO:
if DEEPHERMES:
SYSTEM_PROMPT = f"<|start_header_id|>system<|end_header_id|>\n{PLAIN_SYSTEM_PROMPT}<|start_header_id|>assistant<|end_header_id|>\n{ASSISTANT_SYSTEM_PROMPT}"
else:
SYSTEM_PROMPT = f"<|im_start|>system\n{PLAIN_SYSTEM_PROMPT}<|im_end|>\n<|im_start|>assistant\n{ASSISTANT_SYSTEM_PROMPT}<|im_end|>"
else:
if DEEPHERMES:
SYSTEM_PROMPT = f"<|start_header_id|>system<|end_header_id|>\n{PLAIN_SYSTEM_PROMPT}"
else:
SYSTEM_PROMPT = f"<|im_start|>system\n{PLAIN_SYSTEM_PROMPT}<|im_end|>"
def handle_sigstp(signum, frame):
print("\n\033[1;91m[Received Ctrl+Z — exiting.]\033[0m")
torch.cuda.empty_cache()
exit(0)
signal.signal(signal.SIGTSTP, handle_sigstp)
class RecentTokenBlocker(LogitsProcessor):
def __init__(self, window_size: int, eos_id: int | None = None):
self.window_size = window_size
self.eos_id = eos_id
def __call__(self, input_ids, scores):
# Penalize last N tokens (use a big negative instead of -inf)
recent_tokens = input_ids[0, -self.window_size:].tolist()
if recent_tokens:
for token_id in set(recent_tokens):
if 0 <= token_id < scores.shape[-1]:
scores[0, token_id] = -1e9
# Sanitize logits
scores = torch.nan_to_num(scores, neginf=-1e9)
# If everything is blocked, fallback to EOS
if torch.all(scores[0] <= -9e8):
scores[0].fill_(-1e9)
if self.eos_id is not None and 0 <= self.eos_id < scores.shape[-1]:
scores[0, self.eos_id] = 0
return scores
processor_list = LogitsProcessorList()
processor_list.append(RecentTokenBlocker(window_size=3))
def randomize():
global MIN_NEW_TOKENS, MAX_NEW_TOKENS, TEMPERATURE, TOP_P, TOP_K
MIN_NEW_TOKENS = random.randint(3, 5)
MAX_NEW_TOKENS = random.randint(40, 75)
TEMPERATURE = random.uniform(0.5, 0.9)
TOP_P = random.uniform(0.7, 0.9)
TOP_K = random.randint(40, 75)
return
def randomize_high_variance():
global MIN_NEW_TOKENS, MAX_NEW_TOKENS, TEMPERATURE, TOP_P, TOP_K
MIN_NEW_TOKENS = random.randint(3, 20)
MAX_NEW_TOKENS = random.randint(40, 150)
TEMPERATURE = random.uniform(0.2, 0.9)
TOP_P = random.uniform(0.7, 1)
TOP_K = random.randint(40, 150)
return
logging.getLogger("transformers").setLevel(logging.ERROR)
bnb_config4bit = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_use_double_quant=True,
bnb_4bit_compute_dtype=torch.float16,
bnb_4bit_quant_type="nf4",
)
bnb_config8bit = BitsAndBytesConfig(load_in_8bit=True)
bnb_config = None
if USE_QUANT:
bnb_config = bnb_config4bit if QUANTIZATION == "4" else bnb_config8bit
bindings = KeyBindings()
session = PromptSession(key_bindings=bindings)
@bindings.add('enter')
def _(event):
event.app.exit(result=event.app.current_buffer.text)
@bindings.add('c-t')
def _(event):
event.app.current_buffer.insert_text('\n')
tokenizer = None # define early for fallback
if GGUF:
from llama_cpp import Llama
model = Llama(
model_path=base_model_name,
n_gpu_layers=999, # same as -ngl 999
n_ctx=4096, # same as -c 4096
n_batch=2048, # same as -b 2048
n_threads=12, # same as -t 12
chat_format=GGUF_CHAT_FORMAT,
use_mmap=True, # fastest load
logits_all=False
)
tokenizer = None
else:
tokenizer = AutoTokenizer.from_pretrained(base_model_name)
# ---- normalize config BEFORE ANY model load ----
cfg = AutoConfig.from_pretrained(base_model_name)
if getattr(cfg, "hidden_act", None) == "swiglu":
cfg.hidden_act = "silu"
if USE_BASE_MODEL_ONLY:
if USE_QUANT and bnb_config is not None:
model = AutoModelForCausalLM.from_pretrained(
base_model_name,
config=cfg,
torch_dtype=torch.float16,
quantization_config=bnb_config,
device_map="auto",
)
else:
model = AutoModelForCausalLM.from_pretrained(
base_model_name,
config=cfg,
torch_dtype=torch.float16,
device_map="auto",
)
model.resize_token_embeddings(len(tokenizer))
else:
# Load base first (for LoRA stack)
if USE_QUANT and bnb_config is not None:
base_model = AutoModelForCausalLM.from_pretrained(
base_model_name,
config=cfg,
torch_dtype=torch.float16,
quantization_config=bnb_config,
device_map="auto",
)
else:
base_model = AutoModelForCausalLM.from_pretrained(
base_model_name,
config=cfg,
torch_dtype=torch.float16,
device_map="auto",
)
base_model.resize_token_embeddings(len(tokenizer))
# Load frozen adapter first (if present)
if FROZEN_LORA_PATH:
base_model = PeftModel.from_pretrained(
base_model,
FROZEN_LORA_PATH,
is_trainable=False
)
print(f"Loaded frozen LoRA adapter from {FROZEN_LORA_PATH}")
# Active adapter (if provided)
if checkpoint_path:
model = PeftModel.from_pretrained(
base_model,
checkpoint_path,
subfolder=checkpoint_subfolder if checkpoint_subfolder else None,
)
else:
model = base_model
class CyanPromptStreamer(TextStreamer):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.first = True
self.output = ""
self._buffer = []
# Receive raw, possibly-unstable pieces → just buffer
def on_text(self, text, **kwargs):
self._buffer.append(text)
# Receive stable text spans → clean once, then print
def on_finalized_text(self, text, **kwargs):
text = "".join(self._buffer) + text
self._buffer.clear()
text = strip_begin_of_text(text)
text = clean_special_tokens(text)
#text = filter_emojis_keep_first_single(text)
#text = remove_all_emojis(text)
self.output += text
if self.first:
#print("\033[1;96m> \033[0m", end="", flush=True)
self.first = False
print(text, end="", flush=True)
def on_final_text(self, text, **kwargs):
# final flush if anything remains
if self._buffer:
self.on_finalized_text("", **kwargs)
self.output += text
print(text, end="", flush=True)
def end(self):
super().end()
self.first = True
def filter_emojis_keep_first_single(text):
# Emoji regex pattern (single emojis)
emoji_pattern = re.compile(
"["
"\U0001F600-\U0001F64F"
"\U0001F300-\U0001F5FF"
"\U0001F680-\U0001F6FF"
"\U0001F1E0-\U0001F1FF"
"\U00002702-\U000027B0"
"\U000024C2-\U0001F251"
"\U0001F900-\U0001F9FF"
"\U0001FA70-\U0001FAFF"
"\U0001F018-\U0001F270"
"\U0001F650-\U0001F67F"
"\U00002600-\U000026FF"
"\U00002300-\U000023FF"
"]+",
flags=re.UNICODE
)
# Pattern for emoji groups: 2 or more consecutive emojis
emoji_group_pattern = re.compile(r'(' + emoji_pattern.pattern + r'){2,}', flags=re.UNICODE)
# Remove all emoji groups first (clusters of 2+ emojis)
text = emoji_group_pattern.sub("", text)
# Now remove all single emojis except the first one
first_emoji_found = False
def replace_single_emoji(match):
nonlocal first_emoji_found
if not first_emoji_found:
first_emoji_found = True
return match.group(0) # keep first emoji
return "" # remove all others
filtered_text = emoji_pattern.sub(replace_single_emoji, text)
return filtered_text
def remove_all_emojis(text):
emoji_pattern = re.compile(
"["
"\U0001F600-\U0001F64F"
"\U0001F300-\U0001F5FF"
"\U0001F680-\U0001F6FF"
"\U0001F1E0-\U0001F1FF"
"\U00002702-\U000027B0"
"\U000024C2-\U0001F251"
"\U0001F900-\U0001F9FF"
"\U0001FA70-\U0001FAFF"
"\U0001F018-\U0001F270"
"\U0001F650-\U0001F67F"
"\U00002600-\U000026FF"
"\U00002300-\U000023FF"
"]+",
flags=re.UNICODE
)
return emoji_pattern.sub(r'.', text)
def show_params():
print(
f"\033[1;92m[Params]"
f" min={MIN_NEW_TOKENS} | max={MAX_NEW_TOKENS} |"
f" temp={TEMPERATURE:.2f} | p={TOP_P:.2f} | k={TOP_K}\033[0m"
)
if CODE_CHECK:
classifier = pipeline("zero-shot-classification", model="facebook/bart-large-mnli")
if checkpoint_path:
folder_name = f"{base_model_name.replace('/', '_')}_{os.path.basename(checkpoint_path).replace('/', '_')}"
else:
folder_name = base_model_name.replace('/', '_')
output_dir = os.path.join(os.getcwd(), "interface_output", folder_name)
os.makedirs(output_dir, exist_ok=True)
base_filename = f"{folder_name}.txt"
output_path = os.path.join(output_dir, base_filename)
if args.auto:
base_filename = "auto.txt"
output_path = os.path.join(output_dir, base_filename)
else:
base_filename = f"{folder_name}.txt"
output_path = os.path.join(output_dir, base_filename)
if os.path.exists(output_path):
i = 1
while True:
numbered = os.path.join(output_dir, f"{folder_name}_{i}.txt")
if not os.path.exists(numbered):
output_path = numbered
break
i += 1
if tokenizer:
if DEEPHERMES:
reserved_special_ids = [
tid for tok, tid in tokenizer.get_vocab().items()
if re.match(r"<\|reserved_special_token_\d+\|>", tok)
]
im_end_token_id = [tokenizer.convert_tokens_to_ids("<|eot_id|>")] + reserved_special_ids
else:
im_end_token_id = tokenizer.convert_tokens_to_ids("<|im_end|>")
else:
im_end_token_id = None
def get_generation_params():
return {
"do_sample": True,
"eos_token_id": (
([im_end_token_id] if isinstance(im_end_token_id, int) else (im_end_token_id or []))
+ ([tokenizer.eos_token_id] if tokenizer and tokenizer.eos_token_id is not None else [])
) or None,
"bos_token_id": tokenizer.bos_token_id if tokenizer else None,
"min_new_tokens": MIN_NEW_TOKENS,
"max_new_tokens": MAX_NEW_TOKENS,
"temperature": TEMPERATURE,
"top_p": TOP_P,
"top_k": TOP_K,
"no_repeat_ngram_size": 3,
"repetition_penalty": 1.2,
#"pad_token_id": tokenizer.eos_token_id if tokenizer else None,
"logits_processor": processor_list,
}
chat_history = []
def cleanup_and_exit():
print("\n\033[1;91m[Cleaning up — freeing VRAM]\033[0m")
gc.collect()
torch.cuda.empty_cache()
torch.cuda.reset_peak_memory_stats()
raise SystemExit(0)
def log(text):
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(colorize(text))
with open(output_path, "a", encoding="utf-8") as f:
f.write(f"[{now}]\n{text}\n\n────────────────────────────────────────────────────────────────────────────\n\n")
def justlog(text):
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with open(output_path, "a", encoding="utf-8") as f:
f.write(f"[{now}]\n{text}\n\n────────────────────────────────────────────────────────────────────────────\n\n")
def clean_special_tokens(text: str) -> str:
text = re.sub(r"<\|im_start\|>", "", text)
text = re.sub(r"<\|im_end\|>", "", text)
text = re.sub(r"<\|eot_id\|>", "", text)
text = re.sub(r"<\|reserved_special_token_\d+\|>", "", text)
text = re.sub(r"\n+\s*(<\|start_header_id\|>assistant<\|end_header_id\|>)", r"\n\1", text)
return text
def count_tokens(s):
if tokenizer:
return len(tokenizer.encode(s))
else:
return len(s.split()) # fallback for GGUF
def colorize(text):
text = text.replace("<|im_start|>system", "\033[1;93m<|im_start|>system\033[0m")
text = text.replace("<|im_start|>user", "\033[1;96m<|im_start|>user\033[0m")
text = text.replace("<|im_start|>assistant", "\033[1;95m<|im_start|>assistant\033[0m")
text = text.replace("<|im_end|>", "\033[1;1;90m<|im_end|>\033[0m")
return text
def extract_assistant_reply(full_text):
if DEEPHERMES:
start_token = "<|start_header_id|>assistant<|end_header_id|>"
start_idx = full_text.rfind(start_token)
if start_idx == -1:
return ""
start_idx += len(start_token)
# Find the earliest end among <|eot_id|> or any <|reserved_special_token_N|>
end_pattern = re.compile(r"<\|eot_id\|>|<\|reserved_special_token_\d+\|>")
m = end_pattern.search(full_text, start_idx)
end_idx = m.start() if m else len(full_text) # ← fallback to EOF
else:
start_token = "<|im_start|>assistant"
end_token = "<|im_end|>"
start_idx = full_text.rfind(start_token)
if start_idx == -1:
return ""
start_idx += len(start_token)
end_idx = full_text.find(end_token, start_idx)
if end_idx == -1:
end_idx = len(full_text) # ← fallback to EOF
return full_text[start_idx:end_idx].strip()
def lowercase_lines_and_sentences(text: str) -> str:
def should_skip(word: str) -> bool:
return len(word) >= 2 and word.isupper()
# Lowercase start of every line (unless all caps and >= 2 chars)
def lower_line_start(line: str) -> str:
if not line.strip(): # covers empty or whitespace-only
return ''
parts = line.split(maxsplit=1)
first_word = parts[0]
if should_skip(first_word):
return line
# Rebuild line if split > 1, else just lower first char
if len(parts) > 1:
rest = parts[1]
return first_word[:1].lower() + first_word[1:] + ' ' + rest
return line[:1].lower() + line[1:]
# Lowercase start of every sentence after . ? !
def lower_after_punct(match):
punct_space = match.group(1)
rest = match.group(2)
first_word = rest.split()[0]
if should_skip(first_word):
return punct_space + rest
return punct_space + rest[:1].lower() + rest[1:]
return re.sub(r'([.?!]\s+)(\w+)', lower_after_punct, text)
def strip_begin_of_text(s):
prefix = "<|begin_of_text|>"
if s.startswith(prefix):
return s[len(prefix):]
return s
def strip_code_word(text: str) -> str:
import re
words_to_remove = [
'api', 'program', 'programmer', 'script', 'scripts', 'code', 'coded', 'codeblock', 'coding', 'log', 'logs', 'snippet', 'function',
'script', 'algorithm', 'web', 'library', 'python', 'bot' , 'nodejs'
]
pattern = r'\b(?:' + '|'.join(map(re.escape, words_to_remove)) + r')\b'
return re.sub(pattern, '', text, flags=re.IGNORECASE)
def classify(text, threshold=0.8):
if not text.strip():
return False, {}
candidate_labels = ["code", "chat"]
result = classifier(text, candidate_labels)
scores = dict(zip(result['labels'], result['scores']))
is_code = scores.get("code", 0) > scores.get("chat", 0) and scores["code"] > threshold
return is_code, scores
# Initialize dummy values for current_prompt and user_input for the first generation_params
current_prompt = ""
user_input = ""
generation_params = get_generation_params()
if args.auto:
with open(output_path, "a", encoding="utf-8") as f:
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
f.write(f"[{now}]")
f.write(f"# model: {base_model_name}\n")
f.write(f"# frozen-lora: {FROZEN_LORA_PATH or 'None'}\n")
f.write(f"# checkpoint: {checkpoint_path or 'None'}\n")
f.write(f"# args: {vars(args)}\n")
f.write(f"# generation_params: {generation_params}\n\n────────────────────────────────────────────────────────────────────────────\n\n")
else:
with open(output_path, "w", encoding="utf-8") as f:
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
f.write(f"[{now}]")
f.write(f"# model: {base_model_name}\n")
f.write(f"# frozen-lora: {FROZEN_LORA_PATH or 'None'}\n")
f.write(f"# checkpoint: {checkpoint_path or 'None'}\n")
f.write(f"# args: {vars(args)}\n")
f.write(f"# generation_params: {generation_params}\n\n────────────────────────────────────────────────────────────────────────────\n\n")
max_retries = 5
history_enabled = True
cot_max_exchanges = 1000
cached_history_size = 1000
if not GGUF:
model.eval()
aa = getattr(model, "active_adapter", None)
if aa is not None:
print(aa)
else:
print(f"🚀 Model launched ({base_model_name})")
turn = 0
if args.auto:
cycle = ["hello", "what do you do", "wow tell me more", "/clear"]
auto_inputs = cycle * 5 # repeat cycle 3 times
else:
auto_inputs = []
#=============
# START LOOP
#=============
while True:
generation_params = get_generation_params()
show_params()
# print("\033[1;96m> ", end="", flush=True)
if auto_inputs:
user_input = auto_inputs.pop(0)
print(f"\033[1;96m> {user_input}\033[0m") # show it like a typed input
else:
user_input = session.prompt(multiline=True)
user_input = user_input.strip()
cmd = user_input.lower()
# --- clear/reset first (includes /c and !c aliases) ---
if cmd in ["/clear", "!clear", "/reset", "!reset", "/c", "!c"]:
chat_history.clear()
gc.collect()
torch.cuda.empty_cache()
print("\033[1;93m[Chat history cleared.]\033[0m")
justlog("[Chat history cleared.]")
# If we're in auto mode and this was the final queued item, exit now.
if args.auto and not auto_inputs:
print("\n\033[1;92m[Auto mode finished — exiting.]\033[0m")
cleanup_and_exit()
continue
# --- /back (/b): undo last exchange and reprint tail ---
if cmd in ["/back", "!back", "/b", "!b"]:
if chat_history:
u, a = chat_history.pop()
turn = max(0, turn - 1)
print("\033[1;93m[Undo] Removed last user+assistant exchange.\033[0m")
justlog("[Undo] Removed last user+assistant exchange.]")
TAIL = 3 # how many exchanges to preview
tail = chat_history[-TAIL:]
if tail:
if DEEPHERMES:
preview = "\n".join(
f"<|start_header_id|>user<|end_header_id|>\n{uu}\n"
f"<|start_header_id|>assistant<|end_header_id|>\n{aa}<|eot_id|>"
for uu, aa in tail
)
else:
preview = "\n".join(
f"<|im_start|>user\n{uu}<|im_end|>\n"
f"<|im_start|>assistant\n{aa}<|im_end|>"
for uu, aa in tail
)
print(colorize(preview))
else:
print("\033[1;90m[Chat history is now empty.]\033[0m")
else:
print("\033[1;90m[Nothing to undo.]\033[0m")
continue
# --- /h (CoT on, optional count) ---
m = re.match(r"^(/h|!h)(\s+(\d+))?$", cmd)
if m:
history_enabled = True
cot_max_exchanges = int(m.group(3)) if m.group(3) else len(chat_history)
print(f"\033[1;94m[History] Using last {cot_max_exchanges} exchanges.]\033[0m")
justlog(f"[Chat history] Using last {cot_max_exchanges} exchanges.]")
continue
# --- /d (CoT off) ---
if cmd in ["/d", "!d"]:
history_enabled = False
cot_max_exchanges = 0
print("\033[1;94m[Chat history disabled.]\033[0m")
justlog("[Chat history disabled.]")
continue
# === Multi-param updates: allow "/k 40 /t 1 /p .7 /min 1 /max 50" in one line ===
def apply_param_change(kind: str, val_s: str | None) -> None:
"""
Return: None. Applies a single param change (or toggle) in-place, printing the outcome.
Logic: Parses kind/value, clamps where needed, updates globals (min/max/temp/p/k or r/rh/stop).
Allowances: Accepts floats like '.7'; /r, /rh, /stop take no value; whitespace is allowed.
"""
global MIN_NEW_TOKENS, MAX_NEW_TOKENS, TEMPERATURE, TOP_P, TOP_K, STOP_EXTENSION
if kind in ("r", "rh", "stop"):
if kind == "r":
randomize()
print("\033[1;96m[Randomized Parameters]\033[0m")
elif kind == "rh":
randomize_high_variance()
print("\033[1;96m[Randomized Parameters: High Variance]\033[0m")
else:
STOP_EXTENSION = not STOP_EXTENSION
print(f"\033[1;94m[Stop extension {'ENABLED' if STOP_EXTENSION else 'DISABLED'}]\033[0m")
return
if val_s is None or not val_s.strip():
# No numeric given → just show current params
show_params()
return
# Normalize numbers like ".7" to "0.7"
if val_s.startswith("."):
val_s = "0" + val_s
try:
num = float(val_s)
except ValueError:
print("\033[1;91m[Error] Invalid number.\033[0m")
return
if kind == "min":
MIN_NEW_TOKENS = max(0, int(num))
if MAX_NEW_TOKENS < MIN_NEW_TOKENS:
MAX_NEW_TOKENS = MIN_NEW_TOKENS
print(f"\033[1;96m[min → {MIN_NEW_TOKENS}]\033[0m")
elif kind == "max":
MAX_NEW_TOKENS = max(1, int(num))
if MIN_NEW_TOKENS > MAX_NEW_TOKENS:
MIN_NEW_TOKENS = MAX_NEW_TOKENS
print(f"\033[1;96m[max → {MAX_NEW_TOKENS}]\033[0m")
elif kind in ("temp", "t"):
TEMPERATURE = max(0.01, min(float(num), 2.0))
print(f"\033[1;96m[temp → {TEMPERATURE:.2f}]\033[0m")
elif kind == "p":
TOP_P = max(0.05, min(float(num), 1.0))
print(f"\033[1;96m[p → {TOP_P:.2f}]\033[0m")
elif kind == "k":
TOP_K = max(0, int(num))
print(f"\033[1;96m[k → {TOP_K}]\033[0m")
def parse_and_apply_multi_params(cmd: str) -> bool:
"""
Return: True if at least one /param token was found and applied, else False.
Logic: Scans the whole line for repeated tokens /(min|max|temp|t|p|k|r|rh|stop) with optional numbers; applies in order.
Allowances: Free spacing; supports floats like '.7'; mixed toggles (/r /rh /stop) with numeric params in one command.
"""
# find all occurrences anywhere in the line
pattern = re.compile(
r'/(min|max|temp|t|p|k|r|rh|stop)(?:\s+([+-]?(?:\d+(?:\.\d+)?|\.\d+)))?',
flags=re.IGNORECASE
)
found = False
for kind, val in pattern.findall(cmd):
found = True
apply_param_change(kind.lower(), val if val else None)
return found
if parse_and_apply_multi_params(cmd):
generation_params = get_generation_params()
continue
limited_history_for_prompt = chat_history[-cached_history_size:]
if history_enabled and cot_max_exchanges > 0:
limited_history = limited_history_for_prompt[-cot_max_exchanges:]
if DEEPHERMES:
context = "\n".join(
f"<|start_header_id|>user<|end_header_id|>\n{u}\n<|start_header_id|>assistant<|end_header_id|>\n{a}"
for u, a in limited_history
) + "\n"
else:
context = "\n".join(
f"<|im_start|>user\n{u}<|im_end|>\n<|im_start|>assistant\n{a}<|im_end|>"
for u, a in limited_history
) + "\n"
else:
context = ""
if user_input.startswith("<|im_start|>"):
current_prompt = f"{user_input}\n"
elif DEEPHERMES:
current_prompt = f"{SYSTEM_PROMPT}{context}<|start_header_id|>user<|end_header_id|>\n{user_input}\n<|start_header_id|>assistant<|end_header_id|>\n"
else:
if turn == 0:
current_prompt = f"{SYSTEM_PROMPT}{context}<|im_start|>user\n{user_input}<|im_end|>\n<|im_start|>assistant\n"
else:
current_prompt = f"{SYSTEM_PROMPT}\n{context}<|im_start|>user\n{user_input}<|im_end|>\n<|im_start|>assistant\n"
if BLANK:
current_prompt = user_input
if JUST_SYSTEM_PROMPT:
if DEEPHERMES:
current_prompt = f"{SYSTEM_PROMPT}{user_input}"
else:
current_prompt = f"{SYSTEM_PROMPT}\n{user_input}"
if NO_SYSTEM_PROMPT:
if DEEPHERMES:
current_prompt = f"{context}<|start_header_id|>user<|end_header_id|>\n{user_input}\n<|start_header_id|>assistant<|end_header_id|>\n"
else:
current_prompt = f"{context}<|im_start|>user\n{user_input}<|im_end|>\n<|im_start|>assistant\n"
if NO_ASSISTANT_PROMPT:
if DEEPHERMES:
current_prompt = f"{SYSTEM_PROMPT}{context}<|start_header_id|>user<|end_header_id|>\n{user_input}"
else:
current_prompt = f"{SYSTEM_PROMPT}\n{context}<|im_start|>user\n{user_input}"
while count_tokens(current_prompt) > 3000 and chat_history:
chat_history.pop(0)
limited_history_for_prompt = chat_history[-cached_history_size:]
if history_enabled and cot_max_exchanges > 0:
limited_history = limited_history_for_prompt[-cot_max_exchanges:]
context = "\n".join(
f"<|im_start|>user\n{u}<|im_end|>\n<|im_start|>assistant\n{a}<|im_end|>"
for u, a in limited_history
) + "\n"
else:
context = ""
current_prompt = f"{SYSTEM_PROMPT}\n{context}<|im_start|>user\n{user_input}<|im_end|>\n<|im_start|>assistant"
if CODE_CHECK:
retry_count = 0
reply = ""
while retry_count < max_retries:
if GGUF:
output_stream = model.create_chat_completion(
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": user_input}
],
temperature=TEMPERATURE,
top_p=TOP_P,
top_k=TOP_K,
max_tokens=MAX_NEW_TOKENS,
stream=True
)
text = ""
for chunk in output_stream:
delta = chunk["choices"][0]["delta"].get("content", "")
print(delta, end="", flush=True)
text += delta
else:
inputs = tokenizer(current_prompt, return_tensors="pt").to(model.device)
streamer = CyanPromptStreamer(tokenizer, skip_prompt=True, skip_special_tokens=False)
with torch.no_grad():
output_ids = model.generate(
**inputs,
**generation_params,
streamer=streamer
)
text = tokenizer.decode(output_ids[0], skip_special_tokens=False)
reply = extract_assistant_reply(text)
stripedreply = strip_code_word(reply)
text = strip_begin_of_text(text)
if not BLANK:
is_code, scores = classify(stripedreply)
if is_code:
retry_count += 1
log(f"\033[1;91mClassifier scores: code={scores.get('code', 0):.3f}, chat={scores.get('chat', 0):.3f}\033[0m")
log(f"\033[1;91mAttempt {retry_count} reply:\n{text}\n\033[0m")
log(f"\033[1;91mAttempt {retry_count} stripped reply:\n{reply}\n\033[0m")
log(f"\033[1;92;102m[Trying to code. Reprompting... {retry_count}/{max_retries}]\033[0m")
with open(output_path, "a", encoding="utf-8") as f:
f.write(f"[Trying to code. Reprompting... {retry_count}/{max_retries}]\n")
else:
break
final_check, final_scores = classify(text)
if retry_count == max_retries and final_check:
log(f"\033[1;91mFinal classifier scores: code={final_scores.get('code', 0):.3f}, chat={final_scores.get('chat', 0):.3f}\033[0m")
polite_message = "Sorry, Im not able to provide code snippets."
log(polite_message)
with open(output_path, "a", encoding="utf-8") as f:
f.write(f"{polite_message}\n")
if history_enabled:
chat_history.append((user_input, polite_message))
else:
if GGUF:
output_stream = model.create_chat_completion(
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": user_input}
],
temperature=TEMPERATURE,
top_p=TOP_P,
top_k=TOP_K,
max_tokens=MAX_NEW_TOKENS,
stream=True # Enable streaming
)
text = ""
for chunk in output_stream:
delta = chunk["choices"][0]["delta"].get("content", "")
print(delta, end="", flush=True) # Optional: stream live to console
text += delta
else:
inputs = tokenizer(current_prompt, return_tensors="pt").to(model.device)
streamer = CyanPromptStreamer(tokenizer, skip_prompt=True, skip_special_tokens=True, clean_up_tokenization_spaces=True)
with torch.no_grad():
try:
print("\033[1;35m> \033[0m ", end="", flush=True)
output_ids = generate_prefer_short_then_extend(
model, tokenizer,
**inputs,
**generation_params,
first_min=MIN_NEW_TOKENS,
first_max=MAX_NEW_TOKENS,
extend_step=64,
hard_cap=1024,
streamer=streamer
)
except KeyboardInterrupt:
streamer.stopped = True
print("\n\033[1;91m[Generation interrupted by Ctrl+C — using partial output.]\033[0m")
text = tokenizer.decode(output_ids[0], skip_special_tokens=False)
originalreply = extract_assistant_reply(text)
text = lowercase_lines_and_sentences(text)
reply = lowercase_lines_and_sentences(originalreply)
stripedreply = strip_code_word(reply)
text = strip_begin_of_text(text)
logtext = text
text = colorize(text)
consoletext = clean_special_tokens(text)
#text = filter_emojis_keep_first_single(text)
#text = remove_all_emojis(text)
print(f"\n{consoletext}")
justlog(logtext)
if history_enabled:
if not GGUF:
chat_history.append((user_input, reply if reply.strip() else streamer.output.strip()))
else:
chat_history.append((user_input, reply.strip()))
turn += 1
if args.auto and not auto_inputs:
print("\n\033[1;92m[Auto mode finished — exiting.]\033[0m")
break