164 lines
6.3 KiB
Python
164 lines
6.3 KiB
Python
|
|
"""
|
||
|
|
1B Parameter Decoder-Only Transformer — built from scratch.
|
||
|
|
|
||
|
|
Techniques:
|
||
|
|
- RoPE (Rotary Position Embeddings)
|
||
|
|
- Grouped Query Attention (GQA)
|
||
|
|
- SwiGLU Feed-Forward
|
||
|
|
- RMSNorm (pre-norm architecture)
|
||
|
|
- Flash Attention 2 (via PyTorch SDPA)
|
||
|
|
"""
|
||
|
|
|
||
|
|
import math
|
||
|
|
import torch
|
||
|
|
import torch.nn as nn
|
||
|
|
import torch.nn.functional as F
|
||
|
|
from .config import ModelConfig
|
||
|
|
|
||
|
|
|
||
|
|
class RMSNorm(nn.Module):
|
||
|
|
def __init__(self, dim: int, eps: float = 1e-5):
|
||
|
|
super().__init__()
|
||
|
|
self.eps = eps
|
||
|
|
self.weight = nn.Parameter(torch.ones(dim))
|
||
|
|
|
||
|
|
def forward(self, x: torch.Tensor) -> torch.Tensor:
|
||
|
|
norm = x.float().pow(2).mean(-1, keepdim=True).add(self.eps).rsqrt()
|
||
|
|
return (x.float() * norm).type_as(x) * self.weight
|
||
|
|
|
||
|
|
|
||
|
|
def precompute_rope_freqs(dim: int, max_seq_len: int, theta: float = 10000.0) -> torch.Tensor:
|
||
|
|
freqs = 1.0 / (theta ** (torch.arange(0, dim, 2).float() / dim))
|
||
|
|
t = torch.arange(max_seq_len, dtype=torch.float32)
|
||
|
|
freqs = torch.outer(t, freqs)
|
||
|
|
return torch.polar(torch.ones_like(freqs), freqs) # complex64
|
||
|
|
|
||
|
|
|
||
|
|
def apply_rope(xq: torch.Tensor, xk: torch.Tensor, freqs_cis: torch.Tensor):
|
||
|
|
B, S, H, D = xq.shape
|
||
|
|
xq_c = torch.view_as_complex(xq.float().reshape(B, S, H, D // 2, 2))
|
||
|
|
xk_c = torch.view_as_complex(xk.float().reshape(B, S, xk.shape[2], D // 2, 2))
|
||
|
|
freqs = freqs_cis[:S].clone().unsqueeze(0).unsqueeze(2)
|
||
|
|
xq_out = torch.view_as_real(xq_c * freqs).flatten(3)
|
||
|
|
xk_out = torch.view_as_real(xk_c * freqs).flatten(3)
|
||
|
|
return xq_out.type_as(xq), xk_out.type_as(xk)
|
||
|
|
|
||
|
|
|
||
|
|
class GroupedQueryAttention(nn.Module):
|
||
|
|
def __init__(self, config: ModelConfig):
|
||
|
|
super().__init__()
|
||
|
|
self.num_heads = config.num_attention_heads
|
||
|
|
self.num_kv_heads = config.num_kv_heads
|
||
|
|
self.head_dim = config.head_dim
|
||
|
|
self.num_groups = self.num_heads // self.num_kv_heads
|
||
|
|
|
||
|
|
self.wq = nn.Linear(config.hidden_dim, self.num_heads * self.head_dim, bias=False)
|
||
|
|
self.wk = nn.Linear(config.hidden_dim, self.num_kv_heads * self.head_dim, bias=False)
|
||
|
|
self.wv = nn.Linear(config.hidden_dim, self.num_kv_heads * self.head_dim, bias=False)
|
||
|
|
self.wo = nn.Linear(self.num_heads * self.head_dim, config.hidden_dim, bias=False)
|
||
|
|
|
||
|
|
def forward(self, x: torch.Tensor, freqs_cis: torch.Tensor) -> torch.Tensor:
|
||
|
|
B, S, _ = x.shape
|
||
|
|
|
||
|
|
q = self.wq(x).view(B, S, self.num_heads, self.head_dim)
|
||
|
|
k = self.wk(x).view(B, S, self.num_kv_heads, self.head_dim)
|
||
|
|
v = self.wv(x).view(B, S, self.num_kv_heads, self.head_dim)
|
||
|
|
|
||
|
|
q, k = apply_rope(q, k, freqs_cis)
|
||
|
|
|
||
|
|
# Expand KV heads for GQA
|
||
|
|
if self.num_groups > 1:
|
||
|
|
k = k.unsqueeze(3).expand(B, S, self.num_kv_heads, self.num_groups, self.head_dim)
|
||
|
|
k = k.reshape(B, S, self.num_heads, self.head_dim)
|
||
|
|
v = v.unsqueeze(3).expand(B, S, self.num_kv_heads, self.num_groups, self.head_dim)
|
||
|
|
v = v.reshape(B, S, self.num_heads, self.head_dim)
|
||
|
|
|
||
|
|
# (B, num_heads, S, head_dim) for SDPA
|
||
|
|
q = q.transpose(1, 2)
|
||
|
|
k = k.transpose(1, 2)
|
||
|
|
v = v.transpose(1, 2)
|
||
|
|
|
||
|
|
out = F.scaled_dot_product_attention(q, k, v, is_causal=True)
|
||
|
|
out = out.transpose(1, 2).contiguous().view(B, S, -1)
|
||
|
|
return self.wo(out)
|
||
|
|
|
||
|
|
|
||
|
|
class SwiGLUFFN(nn.Module):
|
||
|
|
def __init__(self, config: ModelConfig):
|
||
|
|
super().__init__()
|
||
|
|
self.w_gate = nn.Linear(config.hidden_dim, config.intermediate_dim, bias=False)
|
||
|
|
self.w_up = nn.Linear(config.hidden_dim, config.intermediate_dim, bias=False)
|
||
|
|
self.w_down = nn.Linear(config.intermediate_dim, config.hidden_dim, bias=False)
|
||
|
|
|
||
|
|
def forward(self, x: torch.Tensor) -> torch.Tensor:
|
||
|
|
return self.w_down(F.silu(self.w_gate(x)) * self.w_up(x))
|
||
|
|
|
||
|
|
|
||
|
|
class TransformerBlock(nn.Module):
|
||
|
|
def __init__(self, config: ModelConfig):
|
||
|
|
super().__init__()
|
||
|
|
self.attention_norm = RMSNorm(config.hidden_dim, eps=config.rms_norm_eps)
|
||
|
|
self.attention = GroupedQueryAttention(config)
|
||
|
|
self.ffn_norm = RMSNorm(config.hidden_dim, eps=config.rms_norm_eps)
|
||
|
|
self.ffn = SwiGLUFFN(config)
|
||
|
|
|
||
|
|
def forward(self, x: torch.Tensor, freqs_cis: torch.Tensor) -> torch.Tensor:
|
||
|
|
x = x + self.attention(self.attention_norm(x), freqs_cis)
|
||
|
|
x = x + self.ffn(self.ffn_norm(x))
|
||
|
|
return x
|
||
|
|
|
||
|
|
|
||
|
|
class Transformer(nn.Module):
|
||
|
|
def __init__(self, config: ModelConfig):
|
||
|
|
super().__init__()
|
||
|
|
self.config = config
|
||
|
|
|
||
|
|
self.tok_embeddings = nn.Embedding(config.vocab_size, config.hidden_dim)
|
||
|
|
self.layers = nn.ModuleList([TransformerBlock(config) for _ in range(config.num_layers)])
|
||
|
|
self.norm = RMSNorm(config.hidden_dim, eps=config.rms_norm_eps)
|
||
|
|
self.output = nn.Linear(config.hidden_dim, config.vocab_size, bias=False)
|
||
|
|
|
||
|
|
# Pre-compute RoPE frequencies
|
||
|
|
self.register_buffer(
|
||
|
|
"freqs_cis",
|
||
|
|
precompute_rope_freqs(config.head_dim, config.max_seq_len * 2, config.rope_theta),
|
||
|
|
persistent=False,
|
||
|
|
)
|
||
|
|
|
||
|
|
self._init_weights()
|
||
|
|
|
||
|
|
def _init_weights(self):
|
||
|
|
"""Initialize with scaled normal, following GPT-NeoX / LLaMA conventions."""
|
||
|
|
for module in self.modules():
|
||
|
|
if isinstance(module, nn.Linear):
|
||
|
|
nn.init.normal_(module.weight, mean=0.0, std=0.02)
|
||
|
|
if module.bias is not None:
|
||
|
|
nn.init.zeros_(module.bias)
|
||
|
|
elif isinstance(module, nn.Embedding):
|
||
|
|
nn.init.normal_(module.weight, mean=0.0, std=0.02)
|
||
|
|
|
||
|
|
# Scale residual projections by 1/sqrt(2*num_layers)
|
||
|
|
scale = (2 * self.config.num_layers) ** -0.5
|
||
|
|
for layer in self.layers:
|
||
|
|
nn.init.normal_(layer.attention.wo.weight, mean=0.0, std=0.02 * scale)
|
||
|
|
nn.init.normal_(layer.ffn.w_down.weight, mean=0.0, std=0.02 * scale)
|
||
|
|
|
||
|
|
def forward(self, tokens: torch.Tensor, targets: torch.Tensor = None):
|
||
|
|
B, S = tokens.shape
|
||
|
|
h = self.tok_embeddings(tokens)
|
||
|
|
|
||
|
|
freqs_cis = self.freqs_cis[:S]
|
||
|
|
for layer in self.layers:
|
||
|
|
h = layer(h, freqs_cis)
|
||
|
|
h = self.norm(h)
|
||
|
|
logits = self.output(h)
|
||
|
|
|
||
|
|
loss = None
|
||
|
|
if targets is not None:
|
||
|
|
loss = F.cross_entropy(
|
||
|
|
logits.view(-1, logits.size(-1)),
|
||
|
|
targets.view(-1),
|
||
|
|
ignore_index=-100,
|
||
|
|
)
|
||
|
|
return logits, loss
|