From 2fa5cec7754432590f0b1478bf7b2ee8333ae4d3 Mon Sep 17 00:00:00 2001 From: Lianmin Zheng Date: Mon, 16 Sep 2024 21:23:31 -0700 Subject: [PATCH] Simplify sampler and its error handling (#1441) --- python/sglang/srt/layers/sampler.py | 113 ++++-------------- python/sglang/srt/managers/schedule_batch.py | 1 + .../sglang/srt/model_executor/model_runner.py | 24 +--- .../srt/sampling/sampling_batch_info.py | 53 +------- 4 files changed, 32 insertions(+), 159 deletions(-) diff --git a/python/sglang/srt/layers/sampler.py b/python/sglang/srt/layers/sampler.py index c0e1d4c7b..55b2daab4 100644 --- a/python/sglang/srt/layers/sampler.py +++ b/python/sglang/srt/layers/sampler.py @@ -1,6 +1,5 @@ -import dataclasses import logging -from typing import Tuple, Union +from typing import Union import torch from flashinfer.sampling import ( @@ -9,43 +8,17 @@ from flashinfer.sampling import ( top_k_top_p_sampling_from_probs, top_p_renorm_prob, ) -from torch.library import custom_op as torch_custom_op -from vllm.model_executor.custom_op import CustomOp +from torch import nn from sglang.srt.layers.logits_processor import LogitsProcessorOutput - -# TODO: move this dict to another place from sglang.srt.managers.schedule_batch import global_server_args_dict from sglang.srt.sampling.sampling_batch_info import SamplingBatchInfo logger = logging.getLogger(__name__) -@dataclasses.dataclass -class SampleOutput: - success: torch.Tensor - probs: torch.Tensor - batch_next_token_ids: torch.Tensor - - -class Sampler(CustomOp): - def __init__(self): - super().__init__() - # FIXME: torch.multinomial has too many bugs - self.forward_native = self.forward_cuda - self.is_torch_compile = False - - def _get_probs(self, logits: torch.Tensor, sampling_info: SamplingBatchInfo): - # Post process logits - logits = logits.contiguous() - logits.div_(sampling_info.temperatures) - if self.is_torch_compile: - # FIXME: Temporary workaround for unknown bugs in torch.compile - logits.add_(0) - - return torch.softmax(logits, dim=-1) - - def forward_cuda( +class Sampler(nn.Module): + def forward( self, logits: Union[torch.Tensor, LogitsProcessorOutput], sampling_info: SamplingBatchInfo, @@ -53,7 +26,15 @@ class Sampler(CustomOp): if isinstance(logits, LogitsProcessorOutput): logits = logits.next_token_logits - probs = self._get_probs(logits, sampling_info) + # Post process logits + logits.div_(sampling_info.temperatures) + probs = logits[:] = torch.softmax(logits, dim=-1) + + if torch.any(torch.isnan(probs)): + logger.warning("Detected errors during sampling! NaN in the probability.") + probs = torch.where( + torch.isnan(probs), torch.full_like(probs, 1e-10), probs + ) if global_server_args_dict["sampling_backend"] == "flashinfer": max_top_k_round, batch_size = 32, probs.shape[0] @@ -67,12 +48,16 @@ class Sampler(CustomOp): probs, uniform_samples, sampling_info.min_ps ) else: - batch_next_token_ids, success = flashinfer_top_k_top_p( + batch_next_token_ids, success = top_k_top_p_sampling_from_probs( probs, uniform_samples, sampling_info.top_ks, sampling_info.top_ps ) + + if not torch.all(success): + logger.warning("Detected errors during sampling!") + batch_next_token_ids = torch.zeros_like(batch_next_token_ids) elif global_server_args_dict["sampling_backend"] == "pytorch": # Here we provide a slower fallback implementation. - batch_next_token_ids, success = top_k_top_p_min_p_sampling_from_probs_torch( + batch_next_token_ids = top_k_top_p_min_p_sampling_from_probs_torch( probs, sampling_info.top_ks, sampling_info.top_ps, sampling_info.min_ps ) else: @@ -80,48 +65,7 @@ class Sampler(CustomOp): f"Invalid sampling backend: {global_server_args_dict['sampling_backend']}" ) - return SampleOutput(success, probs, batch_next_token_ids) - - def forward_native( - self, - logits: Union[torch.Tensor, LogitsProcessorOutput], - sampling_info: SamplingBatchInfo, - ): - if isinstance(logits, LogitsProcessorOutput): - logits = logits.next_token_logits - - probs = self._get_probs(logits, sampling_info) - - batch_next_token_ids, success = top_k_top_p_min_p_sampling_from_probs_torch( - probs, sampling_info.top_ks, sampling_info.top_ps, sampling_info.min_ps - ) - - return SampleOutput(success, probs, batch_next_token_ids) - - -@torch_custom_op("my_lib::flashinfer_top_k_top_p", mutates_args={}) -def flashinfer_top_k_top_p( - probs: torch.Tensor, - uniform_samples: torch.Tensor, - top_ks: torch.Tensor, - top_ps: torch.Tensor, -) -> Tuple[torch.Tensor, torch.Tensor]: - # NOTE: we do not use min_p neither in CUDA nor in torch.compile - return top_k_top_p_sampling_from_probs(probs, uniform_samples, top_ks, top_ps) - - -@flashinfer_top_k_top_p.register_fake -def _( - probs: torch.Tensor, - uniform_samples: torch.Tensor, - top_ks: torch.Tensor, - top_ps: torch.Tensor, -) -> Tuple[torch.Tensor, torch.Tensor]: - bs = probs.shape[0] - return ( - torch.ones(bs, dtype=torch.bool, device=probs.device), - torch.zeros(bs, dtype=torch.int32, device=probs.device), - ) + return batch_next_token_ids def top_k_top_p_min_p_sampling_from_probs_torch( @@ -141,19 +85,6 @@ def top_k_top_p_min_p_sampling_from_probs_torch( ] = 0.0 probs_sort[probs_sort < min_p_thresholds.view(-1, 1)] = 0.0 probs_sort.div_(probs_sort.max(dim=-1, keepdim=True)[0]) - try: - # FIXME: torch.multiomial does not support num_samples = 1 - sampled_index = torch.multinomial(probs_sort, num_samples=2, replacement=True)[ - :, :1 - ] - except RuntimeError as e: - logger.warning(f"Sampling error: {e}") - batch_next_token_ids = torch.zeros( - (probs_sort.shape[0],), dtype=torch.int32, device=probs.device - ) - success = torch.zeros(probs.shape[0], dtype=torch.bool, device=probs.device) - return batch_next_token_ids, success - + sampled_index = torch.multinomial(probs_sort, num_samples=1) batch_next_token_ids = torch.gather(probs_idx, dim=1, index=sampled_index).view(-1) - success = torch.ones(probs.shape[0], dtype=torch.bool, device=probs.device) - return batch_next_token_ids, success + return batch_next_token_ids diff --git a/python/sglang/srt/managers/schedule_batch.py b/python/sglang/srt/managers/schedule_batch.py index cd6d148a7..edcf39026 100644 --- a/python/sglang/srt/managers/schedule_batch.py +++ b/python/sglang/srt/managers/schedule_batch.py @@ -360,6 +360,7 @@ class ScheduleBatch: tree_cache: BasePrefixCache forward_mode: ForwardMode = None + sampling_info: SamplingBatchInfo = None # Batched arguments to model runner input_ids: torch.Tensor = None diff --git a/python/sglang/srt/model_executor/model_runner.py b/python/sglang/srt/model_executor/model_runner.py index f4cdb77ba..123c98f56 100644 --- a/python/sglang/srt/model_executor/model_runner.py +++ b/python/sglang/srt/model_executor/model_runner.py @@ -40,7 +40,7 @@ from vllm.model_executor.models import ModelRegistry from sglang.srt.configs.model_config import AttentionArch, ModelConfig from sglang.srt.layers.attention_backend import FlashInferAttnBackend, TritonAttnBackend from sglang.srt.layers.logits_processor import LogitsProcessorOutput -from sglang.srt.layers.sampler import SampleOutput, Sampler +from sglang.srt.layers.sampler import Sampler from sglang.srt.lora.lora_manager import LoRAManager from sglang.srt.managers.schedule_batch import ScheduleBatch, global_server_args_dict from sglang.srt.mem_cache.memory_pool import ( @@ -516,21 +516,6 @@ class ModelRunner: else: raise ValueError(f"Invaid forward mode: {batch.forward_mode}") - def _check_sample_results(self, sample_output: SampleOutput): - if not torch.all(sample_output.success): - probs = sample_output.probs - batch_next_token_ids = sample_output.batch_next_token_ids - logging.warning("Sampling failed, fallback to top_k=1 strategy") - probs = probs.masked_fill(torch.isnan(probs), 0.0) - argmax_ids = torch.argmax(probs, dim=-1) - batch_next_token_ids = torch.where( - sample_output.success, batch_next_token_ids, argmax_ids - ) - sample_output.probs = probs - sample_output.batch_next_token_ids = batch_next_token_ids - - return sample_output.batch_next_token_ids - def _apply_logits_bias( self, logits: torch.Tensor, sampling_info: SamplingBatchInfo ): @@ -559,13 +544,16 @@ class ModelRunner: def sample( self, logits_output: LogitsProcessorOutput, batch: ScheduleBatch ) -> torch.Tensor: + # Put CPU-heavy tasks here. They will be overlapped with the forward pass. batch.sampling_info.update_regex_vocab_mask(batch) batch.sampling_info.update_penalties() logits = self._apply_logits_bias( logits_output.next_token_logits, batch.sampling_info ) - sample_output = self.sampler(logits, batch.sampling_info) - return self._check_sample_results(sample_output) + + # Sample the next tokens. + next_token_ids = self.sampler(logits, batch.sampling_info) + return next_token_ids @lru_cache() diff --git a/python/sglang/srt/sampling/sampling_batch_info.py b/python/sglang/srt/sampling/sampling_batch_info.py index b73a15a91..5af692868 100644 --- a/python/sglang/srt/sampling/sampling_batch_info.py +++ b/python/sglang/srt/sampling/sampling_batch_info.py @@ -34,56 +34,6 @@ class SamplingBatchInfo: linear_penalties: torch.Tensor = None scaling_penalties: torch.Tensor = None - def __len__(self): - return len(self.temperatures) - - def can_run_in_cuda_graph(self): - # Vocab bias and min_ps are not supported in CUDA graph - return ( - self.logit_bias is None - and self.linear_penalties is None - and self.scaling_penalties is None - and not self.need_min_p_sampling - ) - - @classmethod - def dummy_one(cls, max_bs: int, vocab_size: int): - ret = cls(vocab_size=vocab_size) - with torch.device("cuda"): - ret.temperatures = torch.ones((max_bs, 1), dtype=torch.float) - ret.top_ps = torch.ones((max_bs,), dtype=torch.float) - ret.top_ks = torch.ones((max_bs,), dtype=torch.int) - ret.vocab_mask = torch.zeros((max_bs, vocab_size), dtype=torch.bool) - return ret - - def __getitem__(self, key): - if isinstance(key, slice): - # NOTE:This method is only used in CUDA graph - assert self.can_run_in_cuda_graph() - return SamplingBatchInfo( - vocab_size=self.vocab_size, - temperatures=self.temperatures[key], - top_ps=self.top_ps[key], - top_ks=self.top_ks[key], - vocab_mask=self.vocab_mask[key], - ) - else: - raise NotImplementedError - - def inplace_assign(self, bs: int, other: SamplingBatchInfo): - # NOTE:This method is only used in CUDA graph - assert self.can_run_in_cuda_graph() - - self.vocab_size = other.vocab_size - self.temperatures[:bs] = other.temperatures - self.top_ps[:bs] = other.top_ps - self.top_ks[:bs] = other.top_ks - - if other.vocab_mask is None: - self.vocab_mask[:bs].fill_(False) - else: - self.vocab_mask[:bs] = other.vocab_mask - @classmethod def from_schedule_batch(cls, batch: ScheduleBatch, vocab_size: int): reqs = batch.reqs @@ -130,6 +80,9 @@ class SamplingBatchInfo: return ret + def __len__(self): + return len(self.temperatures) + def update_penalties(self): self.scaling_penalties = None self.linear_penalties = None