# SPDX-License-Identifier: Apache-2.0 # SPDX-FileCopyrightText: Copyright contributors to the vLLM project import contextlib import numpy as np import torch from vllm.v1.outputs import AsyncModelRunnerOutput, LogprobsTensors, ModelRunnerOutput from vllm.v1.worker.gpu.sample.output import SamplerOutput class AsyncOutput(AsyncModelRunnerOutput): def __init__( self, model_runner_output: ModelRunnerOutput, sampler_output: SamplerOutput, num_sampled_tokens: torch.Tensor, main_stream: torch.cuda.Stream, copy_stream: torch.cuda.Stream, copy_event: torch.cuda.Event, ): # NOTE(woosuk): We must retain references to the GPU tensors, # as the copy operations are performed on a different CUDA stream than # the one where the tensors were created. self.model_runner_output = model_runner_output self.sampler_output = sampler_output self.num_sampled_tokens = num_sampled_tokens self.copy_event = copy_event with stream(copy_stream, main_stream): copy_stream.wait_stream(main_stream) self.sampled_token_ids = async_copy_to_np(sampler_output.sampled_token_ids) self.logprobs_tensors: LogprobsTensors | None = None if sampler_output.logprobs_tensors is not None: self.logprobs_tensors = ( sampler_output.logprobs_tensors.to_cpu_nonblocking() ) self.num_nans: np.ndarray | None = None if sampler_output.num_nans is not None: self.num_nans = async_copy_to_np(sampler_output.num_nans) self.num_sampled_tokens_np = async_copy_to_np(num_sampled_tokens) self.prompt_logprobs_dict = { k: v.to_cpu_nonblocking() if v is not None else None for k, v in self.model_runner_output.prompt_logprobs_dict.items() } self.copy_event.record(copy_stream) def get_output(self) -> ModelRunnerOutput: self.copy_event.synchronize() # NOTE(woosuk): The following code is to ensure compatibility with # the existing model runner. # Going forward, we should keep the data structures as NumPy arrays # rather than Python lists. sampled_token_ids: list[list[int]] = self.sampled_token_ids.tolist() num_sampled_tokens: list[int] = self.num_sampled_tokens_np.tolist() for token_ids, num_tokens in zip(sampled_token_ids, num_sampled_tokens): del token_ids[num_tokens:] self.model_runner_output.sampled_token_ids = sampled_token_ids if self.num_nans is not None: self.model_runner_output.num_nans_in_logits = dict( zip(self.model_runner_output.req_ids, self.num_nans.tolist()) ) if self.logprobs_tensors is not None: self.model_runner_output.logprobs = self.logprobs_tensors.tolists() self.model_runner_output.prompt_logprobs_dict = self.prompt_logprobs_dict return self.model_runner_output def async_copy_to_np(x: torch.Tensor) -> np.ndarray: return x.to("cpu", non_blocking=True).numpy() @contextlib.contextmanager def stream(to_stream: torch.cuda.Stream, from_stream: torch.cuda.Stream): """Lightweight version of torch.cuda.stream() context manager which avoids current_stream and device lookups. """ try: torch.cuda.set_stream(to_stream) yield finally: torch.cuda.set_stream(from_stream)