################################################################################ # Copyright(c)2020-2025 Shanghai Biren Technology Co., Ltd. All rights reserved. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ################################################################################ from concurrent.futures import Future from typing import Optional, Union from vllm.executor.ray_utils import RayWorkerWrapper, ray from vllm.v1.core.sched.output import SchedulerOutput from vllm.v1.executor.ray_distributed_executor import RayDistributedExecutor from vllm.v1.outputs import ModelRunnerOutput class FutureWrapper(Future): """A wrapper around a Ray output reference to meet the interface of .execute_model(). """ def __init__(self, ref): super().__init__() self.ref = ref def result(self, timeout=None): if timeout is not None: raise NotImplementedError("timeout is not supported") return ray.get(self.ref) def execute_model( self, scheduler_output, non_block: bool = False, ) -> Union[ModelRunnerOutput, Future[ModelRunnerOutput]]: # TODO: current only support non_block is True, need to apdapt new non_block param assert self.parallel_config.use_ray refs = [] for pp_rank, tp_group in enumerate(self.pp_tp_workers): task_refs = [ worker.execute_model_ray.remote(scheduler_output) for worker in tp_group ] last_pp_rank = len(self.pp_tp_workers) - 1 if pp_rank == last_pp_rank: refs.extend(task_refs) # When PP is not used, we block here until the result is available. if self.max_concurrent_batches == 1: return ray.get(refs[0]) # When PP is used, we return a FutureWrapper immediately so that # the scheduler can yield to the next batch. return FutureWrapper(refs[0]) def execute_model_ray( self, scheduler_output: SchedulerOutput) -> Optional[ModelRunnerOutput]: return self.worker.execute_model(scheduler_output) RayDistributedExecutor.execute_model = execute_model # type: ignore[attr-defined] RayWorkerWrapper.execute_model_ray = execute_model_ray # type: ignore[attr-defined]