80 lines
2.6 KiB
Python
80 lines
2.6 KiB
Python
"""
|
|
Data pipeline: streams and tokenizes OpenWebText for pretraining.
|
|
Packs sequences to max_seq_len for efficiency (no padding waste).
|
|
"""
|
|
|
|
import os
|
|
import torch
|
|
from torch.utils.data import IterableDataset, DataLoader
|
|
from datasets import load_dataset
|
|
from transformers import AutoTokenizer
|
|
|
|
|
|
def get_tokenizer(name: str = "mistralai/Mistral-7B-v0.1"):
|
|
"""Use Mistral's tokenizer — 32k vocab, BPE, well-trained on diverse data."""
|
|
tok = AutoTokenizer.from_pretrained(name, use_fast=True)
|
|
if tok.pad_token is None:
|
|
tok.pad_token = tok.eos_token
|
|
return tok
|
|
|
|
|
|
class PackedPretrainDataset(IterableDataset):
|
|
"""
|
|
Streams text from HuggingFace dataset, tokenizes on the fly,
|
|
and packs into fixed-length sequences for maximum GPU utilization.
|
|
"""
|
|
|
|
def __init__(self, tokenizer, max_seq_len: int, split: str = "train", cache_dir: str = None, seed: int = 42):
|
|
self.tokenizer = tokenizer
|
|
self.max_seq_len = max_seq_len
|
|
self.split = split
|
|
self.cache_dir = cache_dir
|
|
self.seed = seed
|
|
self.eos_id = tokenizer.eos_token_id
|
|
|
|
def _token_stream(self):
|
|
ds = load_dataset(
|
|
"HuggingFaceFW/fineweb-edu",
|
|
name="sample-10BT",
|
|
split=self.split,
|
|
streaming=True,
|
|
cache_dir=self.cache_dir,
|
|
)
|
|
ds = ds.shuffle(seed=self.seed, buffer_size=10_000)
|
|
|
|
for example in ds:
|
|
text = example.get("text", "")
|
|
if len(text.strip()) < 50:
|
|
continue
|
|
token_ids = self.tokenizer.encode(text, add_special_tokens=False)
|
|
yield from token_ids
|
|
yield self.eos_id
|
|
|
|
def __iter__(self):
|
|
buffer = []
|
|
for token_id in self._token_stream():
|
|
buffer.append(token_id)
|
|
if len(buffer) == self.max_seq_len + 1:
|
|
input_ids = torch.tensor(buffer[:-1], dtype=torch.long)
|
|
labels = torch.tensor(buffer[1:], dtype=torch.long)
|
|
yield input_ids, labels
|
|
buffer = []
|
|
|
|
|
|
def create_dataloader(tokenizer, config, rank: int = 0, world_size: int = 1, seed_override: int = None):
|
|
seed = seed_override if seed_override is not None else config.seed
|
|
dataset = PackedPretrainDataset(
|
|
tokenizer=tokenizer,
|
|
max_seq_len=config.max_seq_len,
|
|
split="train",
|
|
cache_dir=config.data_cache_dir,
|
|
seed=seed + rank,
|
|
)
|
|
return DataLoader(
|
|
dataset,
|
|
batch_size=config.batch_size_per_gpu,
|
|
num_workers=config.num_workers,
|
|
pin_memory=True,
|
|
prefetch_factor=4,
|
|
)
|