114 lines
4.4 KiB
Python
114 lines
4.4 KiB
Python
# SPDX-License-Identifier: Apache-2.0
|
|
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
|
|
|
|
from concurrent.futures import Future
|
|
from typing import Callable, Union
|
|
|
|
import torch
|
|
import torch.distributed as dist
|
|
|
|
from vllm.config import VllmConfig
|
|
from vllm.executor.executor_base import ExecutorBase
|
|
from vllm.executor.uniproc_executor import ( # noqa
|
|
ExecutorWithExternalLauncher as ExecutorWithExternalLauncherV0)
|
|
from vllm.executor.uniproc_executor import ( # noqa
|
|
UniProcExecutor as UniProcExecutorV0)
|
|
from vllm.v1.kv_cache_interface import KVCacheConfig, KVCacheSpec
|
|
from vllm.v1.outputs import ModelRunnerOutput
|
|
|
|
FailureCallback = Callable[[], None]
|
|
|
|
|
|
class Executor(ExecutorBase):
|
|
"""
|
|
Abstract class for v1 executors, mainly define some methods for v1.
|
|
For methods shared by v0 and v1, define them in ExecutorBase"""
|
|
|
|
@staticmethod
|
|
def get_class(vllm_config: VllmConfig) -> type["Executor"]:
|
|
executor_class: type[Executor]
|
|
parallel_config = vllm_config.parallel_config
|
|
distributed_executor_backend = (
|
|
parallel_config.distributed_executor_backend)
|
|
# distributed_executor_backend must be set in VllmConfig.__post_init__
|
|
if isinstance(distributed_executor_backend, type):
|
|
if not issubclass(distributed_executor_backend, ExecutorBase):
|
|
raise TypeError(
|
|
"distributed_executor_backend must be a subclass of "
|
|
f"ExecutorBase. Got {distributed_executor_backend}.")
|
|
executor_class = distributed_executor_backend
|
|
elif distributed_executor_backend == "ray":
|
|
from vllm.v1.executor.ray_distributed_executor import ( # noqa
|
|
RayDistributedExecutor)
|
|
executor_class = RayDistributedExecutor
|
|
elif distributed_executor_backend == "mp":
|
|
from vllm.v1.executor.multiproc_executor import MultiprocExecutor
|
|
executor_class = MultiprocExecutor
|
|
elif distributed_executor_backend == "uni":
|
|
executor_class = UniProcExecutor
|
|
elif distributed_executor_backend == "external_launcher":
|
|
# TODO: make v1 scheduling deterministic
|
|
# to support external launcher
|
|
executor_class = ExecutorWithExternalLauncher
|
|
else:
|
|
raise ValueError("Unknown distributed executor backend: "
|
|
f"{distributed_executor_backend}")
|
|
return executor_class
|
|
|
|
def initialize_from_config(self,
|
|
kv_cache_configs: list[KVCacheConfig]) -> None:
|
|
"""
|
|
Initialize the KV caches and begin the model execution loop of the
|
|
underlying workers.
|
|
"""
|
|
self.collective_rpc("initialize_from_config",
|
|
args=(kv_cache_configs, ))
|
|
self.collective_rpc("compile_or_warm_up_model")
|
|
|
|
def register_failure_callback(self, callback: FailureCallback):
|
|
"""
|
|
Register a function to be called if the executor enters a permanent
|
|
failed state.
|
|
"""
|
|
pass
|
|
|
|
def determine_available_memory(self) -> list[int]: # in bytes
|
|
output = self.collective_rpc("determine_available_memory")
|
|
return output
|
|
|
|
def get_kv_cache_specs(self) -> list[dict[str, KVCacheSpec]]:
|
|
output = self.collective_rpc("get_kv_cache_spec")
|
|
return output
|
|
|
|
def execute_model(
|
|
self,
|
|
scheduler_output,
|
|
) -> Union[ModelRunnerOutput, Future[ModelRunnerOutput]]:
|
|
output = self.collective_rpc("execute_model",
|
|
args=(scheduler_output, ))
|
|
return output[0]
|
|
|
|
@property
|
|
def max_concurrent_batches(self) -> int:
|
|
return 1
|
|
|
|
def profile(self, is_start: bool = True):
|
|
self.collective_rpc("profile", args=(is_start, ))
|
|
|
|
|
|
class UniProcExecutor(UniProcExecutorV0, Executor):
|
|
pass
|
|
|
|
|
|
class ExecutorWithExternalLauncher(ExecutorWithExternalLauncherV0, Executor):
|
|
|
|
def determine_available_memory(self) -> list[int]: # in bytes
|
|
# same as determine_num_available_blocks in v0,
|
|
# we need to get the min across all ranks.
|
|
memory = super().determine_available_memory()
|
|
from vllm.distributed.parallel_state import get_world_group
|
|
cpu_group = get_world_group().cpu_group
|
|
memory_tensor = torch.tensor([memory], device="cpu", dtype=torch.int64)
|
|
dist.all_reduce(memory_tensor, group=cpu_group, op=dist.ReduceOp.MIN)
|
|
return [memory_tensor.item()]
|