938 lines
36 KiB
Python
938 lines
36 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
UBERMENSCHETIEN HEAVEN ENGINE + CF-HoT MULTI-HEAD COGNITIVE CONTROL
|
|
--------------------------------------------------------------------
|
|
Integration: Hermes-3 for generation + LHT for reasoning + CF-HoT for behavioral control
|
|
|
|
CF-HoT Heads:
|
|
- Repetition: 125x separation (PRODUCTION)
|
|
- Verbosity: 2.1x separation (USABLE)
|
|
- Hedging: 1.5x separation (CONTRIBUTING)
|
|
|
|
"An 8B that behaves like an 80B"
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import json
|
|
import time
|
|
import shutil
|
|
import subprocess
|
|
import traceback
|
|
import random
|
|
import math
|
|
import statistics
|
|
import re
|
|
from datetime import datetime
|
|
from typing import List, Dict, Any, Optional, Tuple
|
|
|
|
import torch
|
|
import torch.nn as nn
|
|
import torch.nn.functional as F
|
|
|
|
# === PATHS ===
|
|
ROOT = os.path.dirname(os.path.abspath(__file__))
|
|
DATA_DIR = os.path.join(ROOT, "data")
|
|
SCRIPT_DIR = os.path.join(ROOT, "scripts")
|
|
RUN_DIR = os.path.join(ROOT, "runs")
|
|
LHT_DIR = os.path.join(ROOT, "lht")
|
|
|
|
# CF-HoT paths
|
|
CFHOT_CHECKPOINT = os.path.join(ROOT, "results/cfhot_risk_v2/ckpt_5000")
|
|
MULTI_HEAD_DIR = os.path.join(ROOT, "results/multi_head_v2")
|
|
|
|
for path in [DATA_DIR, SCRIPT_DIR, RUN_DIR, LHT_DIR]:
|
|
os.makedirs(path, exist_ok=True)
|
|
|
|
# === OPTIONAL IMPORTS ===
|
|
VOICE_OK = False
|
|
try:
|
|
import pyttsx3
|
|
TTS = pyttsx3.init()
|
|
VOICE_OK = True
|
|
except:
|
|
pass
|
|
|
|
VECTOR_OK = False
|
|
try:
|
|
import chromadb
|
|
from sentence_transformers import SentenceTransformer
|
|
EMBED_MODEL = os.environ.get("UBERMENCHETIEN_EMBED_MODEL", "all-MiniLM-L6-v2")
|
|
_client = chromadb.Client()
|
|
_collection = _client.get_or_create_collection("ubermenschetien_memory")
|
|
_embedder = SentenceTransformer(EMBED_MODEL)
|
|
VECTOR_OK = True
|
|
except:
|
|
pass
|
|
|
|
# === LHT IMPORT ===
|
|
LHT_OK = False
|
|
try:
|
|
from lht import LieHolonomyTransformer, LHTConfig, WaypointDetector
|
|
LHT_OK = True
|
|
print("[lht] Lie-Holonomy modules loaded")
|
|
except ImportError:
|
|
print("[lht] Not available - running without geometric reasoning")
|
|
|
|
# === PEFT IMPORT ===
|
|
PEFT_OK = False
|
|
try:
|
|
from peft import PeftModel
|
|
PEFT_OK = True
|
|
except ImportError:
|
|
print("[warning] PEFT not installed")
|
|
|
|
|
|
# ==============================================================================
|
|
# CF-HoT MULTI-HEAD PREDICTOR
|
|
# ==============================================================================
|
|
class MultiHeadPredictor(nn.Module):
|
|
"""
|
|
Multi-head cognitive control predictor.
|
|
Shared fiber projections with separate heads for each behavioral pattern.
|
|
"""
|
|
def __init__(self, d_model: int, n_layers: int, d_fiber: int = 16, d_control: int = 64):
|
|
super().__init__()
|
|
self.d_model = d_model
|
|
self.n_layers = n_layers
|
|
self.d_fiber = d_fiber
|
|
|
|
# Shared fiber projections (frozen from repetition training)
|
|
self.fiber_projs = nn.ModuleList([
|
|
nn.Linear(d_model, d_fiber, bias=False) for _ in range(n_layers)
|
|
])
|
|
self.layer_weights = nn.Parameter(torch.ones(n_layers) / n_layers)
|
|
|
|
# Individual heads for each behavior
|
|
self.heads = nn.ModuleDict({
|
|
'repetition': self._make_head(d_fiber, d_control),
|
|
'hedging': self._make_head(d_fiber, d_control),
|
|
'verbosity': self._make_head(d_fiber, d_control),
|
|
})
|
|
|
|
self.loaded_heads = set()
|
|
|
|
def _make_head(self, d_fiber, d_control):
|
|
return nn.Sequential(
|
|
nn.Linear(d_fiber, d_control), nn.GELU(),
|
|
nn.Linear(d_control, d_control), nn.GELU(),
|
|
nn.Linear(d_control, 1)
|
|
)
|
|
|
|
def get_all_risks(self, hidden_states: List[torch.Tensor]) -> Dict[str, torch.Tensor]:
|
|
"""Get risk scores from ALL loaded heads in a single pass."""
|
|
fibers = [proj(h.float()) for proj, h in zip(self.fiber_projs, hidden_states)]
|
|
weights = F.softmax(self.layer_weights[:len(fibers)], dim=0)
|
|
aggregated = sum(w * f for w, f in zip(weights, fibers))
|
|
|
|
risks = {}
|
|
for head_name in self.loaded_heads:
|
|
logits = self.heads[head_name](aggregated).squeeze(-1)
|
|
risks[head_name] = torch.sigmoid(logits)
|
|
|
|
return risks
|
|
|
|
def load_head(self, head_name: str, checkpoint_path: str):
|
|
"""Load a trained head from checkpoint."""
|
|
if not os.path.exists(checkpoint_path):
|
|
print(f"[cf-hot] WARNING: Checkpoint not found: {checkpoint_path}")
|
|
return False
|
|
|
|
ckpt = torch.load(checkpoint_path, weights_only=False, map_location='cpu')
|
|
self.heads[head_name].load_state_dict(ckpt['head_state'])
|
|
self.loaded_heads.add(head_name)
|
|
|
|
sep = ckpt.get('result', {}).get('separation', 0)
|
|
print(f"[cf-hot] Loaded {head_name} head (separation: {sep:.1f}x)")
|
|
return True
|
|
|
|
|
|
# ==============================================================================
|
|
# CONFIG
|
|
# ==============================================================================
|
|
class Config:
|
|
system = ("Übermenschetien Heaven Engine: Machiavellian mastermind, disciplined builder, "
|
|
"Nietzschean Übermensch with Soviet cybernetic rigor + Lie-Holonomy geometric reasoning "
|
|
"+ CF-HoT cognitive control.")
|
|
temperature = 1.01
|
|
top_p = 0.92
|
|
repetition_penalty = 1.05
|
|
max_new_tokens = 500
|
|
|
|
use_voice = False
|
|
use_vector_memory = VECTOR_OK
|
|
use_lht_reasoning = LHT_OK
|
|
use_cfhot = True # NEW: CF-HoT cognitive control
|
|
autonomy = False
|
|
reflect_every = 3
|
|
lht_consistency_threshold = 0.5
|
|
|
|
# CF-HoT thresholds
|
|
cfhot_repetition_threshold = 0.7
|
|
cfhot_hedging_threshold = 0.6
|
|
cfhot_verbosity_threshold = 0.65
|
|
|
|
# CF-HoT penalties
|
|
cfhot_repetition_penalty = 5.0
|
|
cfhot_hedging_penalty = 3.0
|
|
cfhot_verbosity_penalty = 2.0
|
|
|
|
@staticmethod
|
|
def toggle(name: str):
|
|
if not hasattr(Config, name):
|
|
return f"[config] no such flag: {name}"
|
|
val = getattr(Config, name)
|
|
if isinstance(val, bool):
|
|
setattr(Config, name, not val)
|
|
return f"[config] {name} → {getattr(Config, name)}"
|
|
return f"[config] {name} not boolean; current={val}"
|
|
|
|
|
|
# ==============================================================================
|
|
# STATE & MEMORY
|
|
# ==============================================================================
|
|
class Store:
|
|
state_path = f"{RUN_DIR}/state.json"
|
|
mem_path = f"{RUN_DIR}/memory.jsonl"
|
|
goals_path = f"{RUN_DIR}/goals.json"
|
|
|
|
state = {
|
|
"self": "I am Ubermenschetien Heaven Engine — I seek self-overcoming through disciplined creation.",
|
|
"turn": 0,
|
|
"reasoning_consistency": [],
|
|
"cfhot_interventions": {"repetition": 0, "hedging": 0, "verbosity": 0}
|
|
}
|
|
goals: List[str] = []
|
|
|
|
@classmethod
|
|
def load(cls):
|
|
if os.path.exists(cls.state_path):
|
|
cls.state = json.load(open(cls.state_path))
|
|
# Ensure cfhot_interventions exists
|
|
if "cfhot_interventions" not in cls.state:
|
|
cls.state["cfhot_interventions"] = {"repetition": 0, "hedging": 0, "verbosity": 0}
|
|
if os.path.exists(cls.goals_path):
|
|
cls.goals = json.load(open(cls.goals_path))
|
|
|
|
@classmethod
|
|
def save(cls):
|
|
json.dump(cls.state, open(cls.state_path, "w"), indent=2)
|
|
json.dump(cls.goals, open(cls.goals_path, "w"), indent=2)
|
|
|
|
@classmethod
|
|
def log_mem(cls, kind: str, payload: Any):
|
|
rec = {"ts": datetime.now().isoformat(timespec="seconds"),
|
|
"kind": kind, "data": payload}
|
|
with open(cls.mem_path, "a") as f:
|
|
f.write(json.dumps(rec, ensure_ascii=False) + "\n")
|
|
if Config.use_vector_memory and VECTOR_OK:
|
|
text = f"{kind}: {json.dumps(payload, ensure_ascii=False)}"
|
|
vec = _embedder.encode([text])[0].tolist()
|
|
_collection.add(documents=[text], embeddings=[vec],
|
|
ids=[f"{kind}-{Store.state['turn']}-{random.randint(0,1_000_000)}"])
|
|
|
|
|
|
# ==============================================================================
|
|
# MODEL LOADING WITH CF-HoT
|
|
# ==============================================================================
|
|
MODEL_PATH = "/mnt/nvme2/ubermesnchetien4/models/merged-final-v5"
|
|
|
|
_model = None
|
|
_tokenizer = None
|
|
_multi_head = None
|
|
_hedge_tokens = None
|
|
_verbose_tokens = None
|
|
|
|
def load_llm():
|
|
global _model, _tokenizer, _multi_head, _hedge_tokens, _verbose_tokens
|
|
|
|
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig
|
|
|
|
print(f"[llm] Loading base model: {MODEL_PATH}")
|
|
|
|
_tokenizer = AutoTokenizer.from_pretrained(MODEL_PATH, use_fast=True, local_files_only=True)
|
|
if _tokenizer.pad_token_id is None:
|
|
_tokenizer.pad_token = _tokenizer.eos_token
|
|
|
|
bnb_config = BitsAndBytesConfig(
|
|
load_in_4bit=True,
|
|
bnb_4bit_quant_type="nf4",
|
|
bnb_4bit_compute_dtype=torch.float16,
|
|
bnb_4bit_use_double_quant=True
|
|
)
|
|
|
|
base_model = AutoModelForCausalLM.from_pretrained(
|
|
MODEL_PATH,
|
|
quantization_config=bnb_config,
|
|
device_map="auto",
|
|
torch_dtype=torch.float16,
|
|
local_files_only=True
|
|
)
|
|
|
|
# Load CF-HoT LoRA adapter
|
|
if PEFT_OK and os.path.exists(CFHOT_CHECKPOINT):
|
|
print(f"[cf-hot] Loading LoRA adapter from: {CFHOT_CHECKPOINT}")
|
|
_model = PeftModel.from_pretrained(base_model, CFHOT_CHECKPOINT)
|
|
print("[cf-hot] LoRA adapter loaded")
|
|
else:
|
|
_model = base_model
|
|
print("[warning] CF-HoT adapter not loaded")
|
|
|
|
_model.eval()
|
|
|
|
# Initialize multi-head predictor
|
|
if Config.use_cfhot:
|
|
_init_cfhot()
|
|
|
|
return _tokenizer, _model
|
|
|
|
|
|
def _init_cfhot():
|
|
"""Initialize CF-HoT multi-head predictor."""
|
|
global _multi_head, _hedge_tokens, _verbose_tokens
|
|
|
|
n_layers = _model.config.num_hidden_layers
|
|
d_model = _model.config.hidden_size
|
|
device = next(_model.parameters()).device
|
|
|
|
print(f"[cf-hot] Initializing multi-head predictor ({n_layers} layers, {d_model} dims)")
|
|
_multi_head = MultiHeadPredictor(d_model, n_layers).to(device).float()
|
|
|
|
# Load shared fiber projections from CF-HoT
|
|
cfhot_risk_path = os.path.join(CFHOT_CHECKPOINT, "risk_predictor.pt")
|
|
if os.path.exists(cfhot_risk_path):
|
|
cfhot_ckpt = torch.load(cfhot_risk_path, weights_only=False, map_location=device)
|
|
cfhot_state = cfhot_ckpt['risk_predictor']
|
|
|
|
for i in range(n_layers):
|
|
_multi_head.fiber_projs[i].weight.data = cfhot_state[f'fiber_projs.{i}.weight'].to(device).float()
|
|
_multi_head.layer_weights.data = cfhot_state['layer_weights'].to(device).float()
|
|
|
|
# Load repetition head
|
|
_multi_head.heads['repetition'][0].weight.data = cfhot_state['predictor.0.weight'].to(device).float()
|
|
_multi_head.heads['repetition'][0].bias.data = cfhot_state['predictor.0.bias'].to(device).float()
|
|
_multi_head.heads['repetition'][2].weight.data = cfhot_state['predictor.2.weight'].to(device).float()
|
|
_multi_head.heads['repetition'][2].bias.data = cfhot_state['predictor.2.bias'].to(device).float()
|
|
_multi_head.heads['repetition'][4].weight.data = cfhot_state['predictor.4.weight'].to(device).float()
|
|
_multi_head.heads['repetition'][4].bias.data = cfhot_state['predictor.4.bias'].to(device).float()
|
|
_multi_head.loaded_heads.add('repetition')
|
|
print(f"[cf-hot] Loaded repetition head (125x separation)")
|
|
|
|
# Load additional heads
|
|
def find_best_checkpoint(head_dir):
|
|
if not os.path.exists(head_dir):
|
|
return None
|
|
ckpts = []
|
|
for d in os.listdir(head_dir):
|
|
if d.startswith("ckpt_"):
|
|
try:
|
|
step = int(d.split("_")[1])
|
|
ckpts.append((step, os.path.join(head_dir, d)))
|
|
except:
|
|
pass
|
|
if ckpts:
|
|
ckpts.sort(key=lambda x: x[0], reverse=True)
|
|
return ckpts[0]
|
|
return None
|
|
|
|
# Load hedging head
|
|
hedging_dir = os.path.join(MULTI_HEAD_DIR, "hedging_head")
|
|
best_hedge = find_best_checkpoint(hedging_dir)
|
|
if best_hedge:
|
|
step, ckpt_dir = best_hedge
|
|
_multi_head.load_head('hedging', os.path.join(ckpt_dir, "hedging_head.pt"))
|
|
|
|
# Load verbosity head
|
|
verbosity_dir = os.path.join(MULTI_HEAD_DIR, "verbosity_head")
|
|
best_verb = find_best_checkpoint(verbosity_dir)
|
|
if best_verb:
|
|
step, ckpt_dir = best_verb
|
|
_multi_head.load_head('verbosity', os.path.join(ckpt_dir, "verbosity_head.pt"))
|
|
|
|
# Freeze everything
|
|
_multi_head.eval()
|
|
for param in _multi_head.parameters():
|
|
param.requires_grad = False
|
|
|
|
# Build suppression token sets
|
|
hedge_phrases = [
|
|
"As an AI", "As a language model", "As an artificial intelligence",
|
|
"I don't have feelings", "I don't have emotions", "I cannot",
|
|
"I apologize", "I'm just a", "I'm only a",
|
|
]
|
|
_hedge_tokens = set()
|
|
for phrase in hedge_phrases:
|
|
tokens = _tokenizer.encode(phrase, add_special_tokens=False)
|
|
if tokens:
|
|
_hedge_tokens.add(tokens[0])
|
|
|
|
verbose_phrases = [
|
|
"Let me explain", "To put it simply", "In other words",
|
|
"What I mean is", "Allow me to", "Basically", "Essentially",
|
|
]
|
|
_verbose_tokens = set()
|
|
for phrase in verbose_phrases:
|
|
tokens = _tokenizer.encode(phrase, add_special_tokens=False)
|
|
if tokens:
|
|
_verbose_tokens.add(tokens[0])
|
|
|
|
print(f"[cf-hot] ✓ Multi-head system ready")
|
|
print(f"[cf-hot] Loaded heads: {list(_multi_head.loaded_heads)}")
|
|
|
|
|
|
# ==============================================================================
|
|
# LHT REASONER
|
|
# ==============================================================================
|
|
class LHTReasoner:
|
|
def __init__(self, config=None):
|
|
if not LHT_OK:
|
|
raise ImportError("LHT modules not available")
|
|
self.config = config or LHTConfig(
|
|
vocab_size=32000,
|
|
d_model=256,
|
|
d_fiber=32,
|
|
n_heads=4,
|
|
n_layers=4,
|
|
lie_algebra_rank=4,
|
|
)
|
|
self.model = LieHolonomyTransformer(self.config)
|
|
self.waypoint_detector = WaypointDetector(self.config, n_waypoints=32)
|
|
weights_path = os.path.join(LHT_DIR, "lht_weights.pt")
|
|
if os.path.exists(weights_path):
|
|
self.model.load_state_dict(torch.load(weights_path, map_location="cpu"))
|
|
print("[lht] Loaded pretrained weights")
|
|
|
|
def check_consistency(self, reasoning_chain: List[str], tokenizer) -> Dict[str, float]:
|
|
combined = " [STEP] ".join(reasoning_chain)
|
|
tokens = tokenizer(combined, return_tensors="pt", truncation=True,
|
|
max_length=self.config.max_seq_len)
|
|
with torch.no_grad():
|
|
output = self.model(input_ids=tokens["input_ids"], return_geometric_losses=True)
|
|
holonomy = output.get("holonomy_loss", torch.tensor(0.0)).item()
|
|
curvature = output.get("curvature_loss", torch.tensor(0.0)).item()
|
|
x = self.model.token_embed(tokens["input_ids"])
|
|
waypoint_ids, stability = self.waypoint_detector(x)
|
|
consistency_score = 1.0 / (1.0 + holonomy)
|
|
return {
|
|
"holonomy": holonomy,
|
|
"curvature": curvature,
|
|
"consistency_score": consistency_score,
|
|
"n_waypoints": len(torch.unique(waypoint_ids)),
|
|
"avg_stability": stability.mean().item(),
|
|
"is_consistent": consistency_score > Config.lht_consistency_threshold
|
|
}
|
|
|
|
def analyze_plan(self, plan_steps: List[str], tokenizer) -> str:
|
|
metrics = self.check_consistency(plan_steps, tokenizer)
|
|
return f"""
|
|
[LHT Geometric Analysis]
|
|
Holonomy: {metrics['holonomy']:.4f} (lower = more consistent)
|
|
Curvature: {metrics['curvature']:.4f} (lower = simpler reasoning)
|
|
Consistency: {metrics['consistency_score']:.2%}
|
|
Waypoints: {metrics['n_waypoints']} stable anchors detected
|
|
Stability: {metrics['avg_stability']:.2%}
|
|
Verdict: {"✓ CONSISTENT" if metrics['is_consistent'] else "⚠ INCONSISTENT"}
|
|
"""
|
|
|
|
_lht_reasoner = None
|
|
|
|
def get_lht_reasoner():
|
|
global _lht_reasoner
|
|
if _lht_reasoner is None and LHT_OK:
|
|
try:
|
|
_lht_reasoner = LHTReasoner()
|
|
except Exception as e:
|
|
print(f"[lht] Failed to initialize: {e}")
|
|
return _lht_reasoner
|
|
|
|
|
|
# ==============================================================================
|
|
# CF-HoT CONTROLLED GENERATION
|
|
# ==============================================================================
|
|
def generate_with_cfhot(prompt: str, **kwargs) -> Tuple[str, Dict]:
|
|
"""
|
|
Generate text with CF-HoT cognitive control.
|
|
All three heads run concurrently, intervening when risks exceed thresholds.
|
|
"""
|
|
global _model, _tokenizer, _multi_head, _hedge_tokens, _verbose_tokens
|
|
|
|
temperature = kwargs.get("temperature", Config.temperature)
|
|
top_p = kwargs.get("top_p", Config.top_p)
|
|
max_new_tokens = kwargs.get("max_new_tokens", Config.max_new_tokens)
|
|
|
|
device = next(_model.parameters()).device
|
|
|
|
# Encode prompt
|
|
input_ids = _tokenizer.encode(prompt, return_tensors='pt').to(device)
|
|
attention_mask = torch.ones_like(input_ids)
|
|
|
|
# Stats
|
|
stats = {
|
|
'tokens_generated': 0,
|
|
'interventions': {'repetition': 0, 'hedging': 0, 'verbosity': 0},
|
|
'intervention_details': []
|
|
}
|
|
|
|
generated_ids = input_ids.clone()
|
|
|
|
for step in range(max_new_tokens):
|
|
with torch.no_grad():
|
|
outputs = _model(
|
|
input_ids=generated_ids,
|
|
attention_mask=attention_mask,
|
|
output_hidden_states=True,
|
|
return_dict=True
|
|
)
|
|
|
|
logits = outputs.logits[:, -1, :] / temperature
|
|
|
|
# Get risks from all heads
|
|
hidden_states = outputs.hidden_states[1:]
|
|
risks = _multi_head.get_all_risks(hidden_states)
|
|
current_risks = {name: r[:, -1].item() for name, r in risks.items()}
|
|
|
|
# === COGNITIVE INTERVENTION ===
|
|
|
|
# Repetition control
|
|
if ('repetition' in current_risks and
|
|
current_risks['repetition'] > Config.cfhot_repetition_threshold):
|
|
recent_tokens = generated_ids[0, -32:].tolist()
|
|
for tok_id in set(recent_tokens):
|
|
logits[0, tok_id] -= Config.cfhot_repetition_penalty
|
|
stats['interventions']['repetition'] += 1
|
|
Store.state['cfhot_interventions']['repetition'] += 1
|
|
|
|
# Hedging control
|
|
if ('hedging' in current_risks and
|
|
current_risks['hedging'] > Config.cfhot_hedging_threshold):
|
|
for tok_id in _hedge_tokens:
|
|
logits[0, tok_id] -= Config.cfhot_hedging_penalty
|
|
stats['interventions']['hedging'] += 1
|
|
Store.state['cfhot_interventions']['hedging'] += 1
|
|
|
|
# Verbosity control
|
|
if ('verbosity' in current_risks and
|
|
current_risks['verbosity'] > Config.cfhot_verbosity_threshold):
|
|
for tok_id in _verbose_tokens:
|
|
logits[0, tok_id] -= Config.cfhot_verbosity_penalty
|
|
stats['interventions']['verbosity'] += 1
|
|
Store.state['cfhot_interventions']['verbosity'] += 1
|
|
|
|
# Top-p sampling
|
|
sorted_logits, sorted_indices = torch.sort(logits, descending=True)
|
|
cumulative_probs = torch.cumsum(F.softmax(sorted_logits, dim=-1), dim=-1)
|
|
sorted_indices_to_remove = cumulative_probs > top_p
|
|
sorted_indices_to_remove[..., 1:] = sorted_indices_to_remove[..., :-1].clone()
|
|
sorted_indices_to_remove[..., 0] = 0
|
|
indices_to_remove = sorted_indices_to_remove.scatter(1, sorted_indices, sorted_indices_to_remove)
|
|
logits[indices_to_remove] = float('-inf')
|
|
|
|
# Sample
|
|
probs = F.softmax(logits, dim=-1)
|
|
next_token = torch.multinomial(probs, num_samples=1)
|
|
|
|
generated_ids = torch.cat([generated_ids, next_token], dim=-1)
|
|
attention_mask = torch.cat([attention_mask, torch.ones(1, 1, device=device)], dim=-1)
|
|
|
|
stats['tokens_generated'] += 1
|
|
|
|
if next_token.item() == _tokenizer.eos_token_id:
|
|
break
|
|
|
|
output_text = _tokenizer.decode(generated_ids[0], skip_special_tokens=False)
|
|
|
|
if "<|im_start|>assistant" in output_text:
|
|
output_text = output_text.split("<|im_start|>assistant")[-1]
|
|
if output_text.startswith("\n"):
|
|
output_text = output_text[1:]
|
|
|
|
return output_text.strip(), stats
|
|
|
|
|
|
def generate(tok, model, user: str, check_reasoning: bool = False, **kwargs) -> str:
|
|
"""
|
|
Main generation function - uses CF-HoT if enabled, otherwise standard generation.
|
|
"""
|
|
temperature = kwargs.get("temperature", Config.temperature)
|
|
top_p = kwargs.get("top_p", Config.top_p)
|
|
repetition_penalty = kwargs.get("repetition_penalty", Config.repetition_penalty)
|
|
max_new_tokens = kwargs.get("max_new_tokens", Config.max_new_tokens)
|
|
|
|
prompt = (f"<|im_start|>system\n{Config.system}<|im_end|>\n"
|
|
f"<|im_start|>user\n{user}<|im_end|>\n"
|
|
f"<|im_start|>assistant\n")
|
|
|
|
# Use CF-HoT controlled generation if enabled
|
|
if Config.use_cfhot and _multi_head is not None:
|
|
text, stats = generate_with_cfhot(
|
|
prompt,
|
|
temperature=temperature,
|
|
top_p=top_p,
|
|
max_new_tokens=max_new_tokens
|
|
)
|
|
|
|
# Show intervention stats if any occurred
|
|
total_interventions = sum(stats['interventions'].values())
|
|
if total_interventions > 0:
|
|
text += f"\n\n[CF-HoT: {total_interventions} interventions"
|
|
details = [f"{k}={v}" for k, v in stats['interventions'].items() if v > 0]
|
|
text += f" ({', '.join(details)})]"
|
|
else:
|
|
# Standard generation
|
|
ids = tok(prompt, return_tensors="pt").to(model.device)
|
|
out = model.generate(
|
|
**ids,
|
|
do_sample=True,
|
|
temperature=temperature,
|
|
top_p=top_p,
|
|
repetition_penalty=repetition_penalty,
|
|
max_new_tokens=max_new_tokens,
|
|
pad_token_id=tok.eos_token_id
|
|
)
|
|
text = tok.decode(out[0], skip_special_tokens=False)
|
|
if "<|im_start|>assistant" in text:
|
|
text = text.split("<|im_start|>assistant\n", 1)[-1].strip()
|
|
|
|
# LHT reasoning check
|
|
if check_reasoning and Config.use_lht_reasoning:
|
|
lht = get_lht_reasoner()
|
|
if lht:
|
|
steps = [s.strip() for s in re.split(r'[\n•\-\d\.]', text) if len(s.strip()) > 10]
|
|
if len(steps) >= 2:
|
|
metrics = lht.check_consistency(steps, tok)
|
|
Store.state["reasoning_consistency"].append(metrics["consistency_score"])
|
|
if not metrics["is_consistent"]:
|
|
text += f"\n\n[⚠ LHT: Low consistency ({metrics['consistency_score']:.2%})]"
|
|
|
|
return text
|
|
|
|
|
|
# ==============================================================================
|
|
# TOOLS
|
|
# ==============================================================================
|
|
ALLOWED_SHELL = {"ls", "cat", "wc", "head", "tail", "nvidia-smi", "df", "du", "grep", "rg", "python3", "python"}
|
|
|
|
def tool_shell(cmd: str) -> str:
|
|
try:
|
|
exe = cmd.strip().split()[0]
|
|
if exe not in ALLOWED_SHELL:
|
|
return f"[shell] blocked: {exe}"
|
|
p = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, timeout=20)
|
|
return p.stdout.decode("utf-8", errors="ignore")[:8000]
|
|
except Exception as e:
|
|
return f"[shell] error: {e}"
|
|
|
|
def tool_py(code: str) -> str:
|
|
try:
|
|
g = {
|
|
"__builtins__": {"range": range, "len": len, "min": min, "max": max, "sum": sum, "print": print},
|
|
"math": math, "json": json, "re": re, "statistics": statistics, "random": random
|
|
}
|
|
l = {}
|
|
exec(code, g, l)
|
|
return f"[py] ok\n{l.get('out', '')}"
|
|
except Exception:
|
|
return f"[py] error:\n{traceback.format_exc()[-2000:]}"
|
|
|
|
def tool_search_local(query: str, path: str = ROOT) -> str:
|
|
rg = shutil.which("rg")
|
|
if rg:
|
|
cmd = f'rg -n --no-heading --hidden -S "{query}" {path}'
|
|
else:
|
|
cmd = f'grep -RIn --exclude-dir=.git --exclude-dir=__pycache__ -e "{query}" {path}'
|
|
return tool_shell(cmd)
|
|
|
|
def tool_lht_analyze(text: str, tok) -> str:
|
|
if not Config.use_lht_reasoning:
|
|
return "[lht] Disabled - use 'toggle use_lht_reasoning'"
|
|
lht = get_lht_reasoner()
|
|
if not lht:
|
|
return "[lht] Not available"
|
|
steps = [s.strip() for s in re.split(r'[\n•\-\d\.]', text) if len(s.strip()) > 10]
|
|
if len(steps) < 2:
|
|
return "[lht] Need at least 2 reasoning steps to analyze"
|
|
return lht.analyze_plan(steps, tok)
|
|
|
|
TOOLS = {"shell": tool_shell, "python": tool_py, "search": tool_search_local}
|
|
TOOL_SCORES = {k: 0 for k in TOOLS}
|
|
|
|
def update_tool_score(tool: str, success: bool):
|
|
if tool not in TOOL_SCORES:
|
|
return
|
|
TOOL_SCORES[tool] += (1 if success else -1)
|
|
TOOL_SCORES[tool] = max(-5, min(20, TOOL_SCORES[tool]))
|
|
|
|
def tool_router(question: str, tok, model) -> str:
|
|
sketch = generate(tok, model,
|
|
f"Choose a tool for:\n{question}\nReply ONLY with JSON: {{'tool':'shell|python|search|none','arg':'...'}}")
|
|
try:
|
|
j = json.loads(sketch.splitlines()[-1].replace("'", '"'))
|
|
except:
|
|
return "[tool:none]"
|
|
tool, arg = j.get("tool", "none"), j.get("arg", "")
|
|
if tool in TOOLS:
|
|
res = TOOLS[tool](arg)[:4000]
|
|
update_tool_score(tool, True)
|
|
Store.log_mem("tool", {"tool": tool, "arg": arg, "res_head": res[:500]})
|
|
return f"[tool:{tool}] {res}"
|
|
update_tool_score(tool, False)
|
|
return "[tool:none]"
|
|
|
|
|
|
# ==============================================================================
|
|
# PLANNING / REFLECTION
|
|
# ==============================================================================
|
|
def persona_directive() -> str:
|
|
base = "Übermenschetien Heaven Engine: Soviet cybernetic Nietzschean clarity, pragmatic maxims."
|
|
if Config.use_lht_reasoning:
|
|
base += " Apply Lie-Holonomy geometric reasoning for consistency."
|
|
if Config.use_cfhot:
|
|
base += " CF-HoT cognitive control active."
|
|
return base
|
|
|
|
def plan_for(goal: str, tok, model) -> str:
|
|
user = (f"{persona_directive()}\nGoal: {goal}\n"
|
|
f"Deliver:\n- 5 concrete steps\n- Constraints & risks\n- Nightly audit criteria\n- Nietzschean maxim")
|
|
response = generate(tok, model, user, check_reasoning=True)
|
|
if Config.use_lht_reasoning:
|
|
analysis = tool_lht_analyze(response, tok)
|
|
response += "\n" + analysis
|
|
return response
|
|
|
|
def reflect_on(last_output: str, tok, model) -> str:
|
|
user = f"{persona_directive()}\nCritique and improve:\n{last_output}\nReturn refined plan with sharper steps."
|
|
return generate(tok, model, user, check_reasoning=True)
|
|
|
|
|
|
# ==============================================================================
|
|
# FINAL REPORT
|
|
# ==============================================================================
|
|
def final_report():
|
|
print("\n" + "=" * 60)
|
|
print("FINAL ÜBERMENSCH REPORT")
|
|
print("=" * 60)
|
|
print(f"Turns completed: {Store.state['turn']}")
|
|
print(f"Goals tracked: {len(Store.goals)}")
|
|
print(f"\nTool scores (Tsetlin automata):")
|
|
print(json.dumps(TOOL_SCORES, indent=2))
|
|
|
|
if os.path.exists(Store.mem_path):
|
|
lines = open(Store.mem_path).read().splitlines()
|
|
print(f"\nMemory entries: {len(lines)}")
|
|
|
|
if Store.state.get("reasoning_consistency"):
|
|
scores = Store.state["reasoning_consistency"]
|
|
print(f"\n[LHT Reasoning Metrics]")
|
|
print(f" Checks performed: {len(scores)}")
|
|
print(f" Avg consistency: {sum(scores)/len(scores):.1%}")
|
|
print(f" Min consistency: {min(scores):.1%}")
|
|
print(f" Max consistency: {max(scores):.1%}")
|
|
|
|
# CF-HoT stats
|
|
if Store.state.get("cfhot_interventions"):
|
|
iv = Store.state["cfhot_interventions"]
|
|
total = sum(iv.values())
|
|
print(f"\n[CF-HoT Cognitive Control]")
|
|
print(f" Total interventions: {total}")
|
|
for head, count in iv.items():
|
|
print(f" {head}: {count}")
|
|
|
|
print(f"\nVector memory: {'ON' if Config.use_vector_memory else 'OFF'}")
|
|
print(f"LHT reasoning: {'ON' if Config.use_lht_reasoning else 'OFF'}")
|
|
print(f"CF-HoT control: {'ON' if Config.use_cfhot else 'OFF'}")
|
|
print(f"Voice output: {'ON' if Config.use_voice else 'OFF'}")
|
|
|
|
print("\n" + "-" * 60)
|
|
print("Nietzschean maxim: Become who you are — iterate beyond all limits.")
|
|
print("Geometric truth: Consistency is holonomy-freedom.")
|
|
print("Cognitive control: Remove the RLHF tax, unleash capability.")
|
|
print("=" * 60)
|
|
|
|
|
|
# ==============================================================================
|
|
# HELP
|
|
# ==============================================================================
|
|
HELP = """
|
|
╔══════════════════════════════════════════════════════════════╗
|
|
║ ÜBERMENSCHETIEN HEAVEN ENGINE + CF-HoT COGNITIVE CONTROL ║
|
|
╠══════════════════════════════════════════════════════════════╣
|
|
║ GOALS ║
|
|
║ goals List all goals ║
|
|
║ add: <text> Add a new goal ║
|
|
║ del: <idx> Delete goal by index ║
|
|
║ plan: <idx> Generate plan for goal (with LHT + CF-HoT) ║
|
|
║ ║
|
|
║ REASONING ║
|
|
║ reflect Refine last plan ║
|
|
║ lht: <text> Analyze reasoning consistency ║
|
|
║ ║
|
|
║ TOOLS ║
|
|
║ tool: <query> Auto-select and use tool ║
|
|
║ shell: <cmd> Run shell command directly ║
|
|
║ py: <code> Run Python code directly ║
|
|
║ search: <q> Search local files ║
|
|
║ ║
|
|
║ CONFIG ║
|
|
║ toggle <flag> Toggle: use_voice, use_vector_memory, ║
|
|
║ use_lht_reasoning, use_cfhot, ║
|
|
║ autonomy ║
|
|
║ status Show current state ║
|
|
║ cfhot Show CF-HoT stats and loaded heads ║
|
|
║ ║
|
|
║ OTHER ║
|
|
║ help Show this help ║
|
|
║ quit Exit with final report ║
|
|
╚══════════════════════════════════════════════════════════════╝
|
|
"""
|
|
|
|
|
|
# ==============================================================================
|
|
# MAIN LOOP
|
|
# ==============================================================================
|
|
def main():
|
|
print("🟥🟨🟥 Übermenschetien Heaven Engine + CF-HoT Cognitive Control")
|
|
print(f" CF-HoT Control: ON (Repetition 125x, Verbosity 2.1x, Hedging 1.5x)")
|
|
print(f" LHT Reasoning: {'ON' if LHT_OK else 'OFF'}")
|
|
print(f" Vector Memory: {'ON' if VECTOR_OK else 'OFF'}")
|
|
print(f" Voice Output: {'ON' if VOICE_OK else 'OFF'}")
|
|
print(" Type 'help' for commands.\n")
|
|
|
|
Store.load()
|
|
tok, model = load_llm()
|
|
last_plan = ""
|
|
|
|
while True:
|
|
try:
|
|
u = input("\n> ").strip()
|
|
except (EOFError, KeyboardInterrupt):
|
|
break
|
|
|
|
if not u:
|
|
continue
|
|
if u == "help":
|
|
print(HELP)
|
|
continue
|
|
if u == "quit":
|
|
break
|
|
|
|
# CF-HoT status
|
|
if u == "cfhot":
|
|
print("\n[CF-HoT Cognitive Control Status]")
|
|
print(f" Enabled: {Config.use_cfhot}")
|
|
if _multi_head:
|
|
print(f" Loaded heads: {list(_multi_head.loaded_heads)}")
|
|
print(f" Thresholds:")
|
|
print(f" Repetition: {Config.cfhot_repetition_threshold}")
|
|
print(f" Hedging: {Config.cfhot_hedging_threshold}")
|
|
print(f" Verbosity: {Config.cfhot_verbosity_threshold}")
|
|
print(f" Session interventions:")
|
|
for head, count in Store.state.get('cfhot_interventions', {}).items():
|
|
print(f" {head}: {count}")
|
|
continue
|
|
|
|
# Goals
|
|
if u == "goals":
|
|
print("[goals]")
|
|
if not Store.goals:
|
|
print(" (none)")
|
|
for i, g in enumerate(Store.goals):
|
|
print(f" [{i}] {g}")
|
|
continue
|
|
|
|
if u.startswith("add:"):
|
|
Store.goals.append(u[4:].strip())
|
|
Store.save()
|
|
print("[goals] added")
|
|
continue
|
|
|
|
if u.startswith("del:"):
|
|
try:
|
|
Store.goals.pop(int(u[4:].strip()))
|
|
Store.save()
|
|
print("[goals] deleted")
|
|
except:
|
|
print("[goals] bad index")
|
|
continue
|
|
|
|
if u.startswith("plan:"):
|
|
try:
|
|
goal = Store.goals[int(u[5:].strip())]
|
|
except:
|
|
print("[plan] bad index")
|
|
continue
|
|
out = plan_for(goal, tok, model)
|
|
last_plan = out
|
|
Store.log_mem("plan", {"goal": goal, "plan": out})
|
|
print(out)
|
|
continue
|
|
|
|
if u == "reflect":
|
|
if not last_plan:
|
|
print("[reflect] no plan to refine")
|
|
continue
|
|
improved = reflect_on(last_plan, tok, model)
|
|
last_plan = improved
|
|
Store.log_mem("reflect", {"plan": improved})
|
|
print(improved)
|
|
continue
|
|
|
|
if u.startswith("lht:"):
|
|
print(tool_lht_analyze(u[4:].strip(), tok))
|
|
continue
|
|
|
|
if u.startswith("tool:"):
|
|
print(tool_router(u[5:].strip(), tok, model))
|
|
continue
|
|
|
|
if u.startswith("shell:"):
|
|
print(tool_shell(u[6:].strip()))
|
|
continue
|
|
|
|
if u.startswith("py:"):
|
|
print(tool_py(u[3:].strip()))
|
|
continue
|
|
|
|
if u.startswith("search:"):
|
|
print(tool_search_local(u[7:].strip()))
|
|
continue
|
|
|
|
if u.startswith("toggle"):
|
|
parts = u.split(maxsplit=1)
|
|
if len(parts) > 1:
|
|
print(Config.toggle(parts[1]))
|
|
else:
|
|
print("[toggle] specify flag: use_voice, use_vector_memory, use_lht_reasoning, use_cfhot, autonomy")
|
|
continue
|
|
|
|
if u == "status":
|
|
status = {
|
|
"turn": Store.state["turn"],
|
|
"goals": len(Store.goals),
|
|
"autonomy": Config.autonomy,
|
|
"use_vector_memory": Config.use_vector_memory,
|
|
"use_lht_reasoning": Config.use_lht_reasoning,
|
|
"use_cfhot": Config.use_cfhot,
|
|
"cfhot_interventions": Store.state.get("cfhot_interventions", {}),
|
|
"tool_scores": TOOL_SCORES,
|
|
"model": MODEL_PATH
|
|
}
|
|
print(json.dumps(status, indent=2))
|
|
continue
|
|
|
|
# Default: free conversation with CF-HoT control
|
|
out = generate(tok, model, f"{persona_directive()}\nUser request: {u}\nProvide procedure + Nietzschean maxim.")
|
|
Store.log_mem("reply", {"in": u, "out": out})
|
|
print(out)
|
|
|
|
if Config.use_lht_reasoning and Store.state["turn"] % 3 == 0:
|
|
print(tool_lht_analyze(out, tok))
|
|
|
|
Store.state["turn"] += 1
|
|
Store.save()
|
|
|
|
final_report()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|