Files
enginex-mlu370-vllm/vllm-v0.6.2/vllm/executor/cpu_executor.py

378 lines
14 KiB
Python
Raw Normal View History

2026-02-04 17:22:39 +08:00
import os
from functools import partial
from typing import Any, Awaitable, List, Optional, Set, Tuple, Union
import vllm.envs as envs
from vllm.config import (CacheConfig, ModelConfig, ParallelConfig,
SchedulerConfig)
from vllm.executor.executor_base import ExecutorAsyncBase, ExecutorBase
from vllm.executor.multiproc_worker_utils import (ProcessWorkerWrapper,
ResultHandler, WorkerMonitor)
from vllm.logger import init_logger
from vllm.lora.request import LoRARequest
from vllm.model_executor.layers.sampler import SamplerOutput
from vllm.prompt_adapter.request import PromptAdapterRequest
from vllm.sequence import ExecuteModelRequest
from vllm.utils import (GiB_bytes, get_distributed_init_method, get_open_port,
get_vllm_instance_id, make_async)
from vllm.worker.worker_base import WorkerWrapperBase
logger = init_logger(__name__)
class CPUExecutor(ExecutorBase):
uses_ray: bool = False
def _init_executor(self) -> None:
assert self.device_config.device_type == "cpu"
# Reminder: Please update docs/source/serving/compatibility_matrix.rst
# If the feature combo become valid
assert self.lora_config is None, "cpu backend doesn't support LoRA"
#
# Environment variables for CPU executor
#
# Ensure that VLLM_INSTANCE_ID is set, to be inherited by workers
os.environ["VLLM_INSTANCE_ID"] = get_vllm_instance_id()
# Disable torch async compiling which won't work with daemonic processes
os.environ["TORCHINDUCTOR_COMPILE_THREADS"] = "1"
# Intel OpenMP setting
ld_prealod_str = os.getenv("LD_PRELOAD", "")
if "libiomp5.so" in ld_prealod_str:
# The time(milliseconds) that a thread should wait after
# completing the execution of a parallel region, before sleeping.
os.environ['KMP_BLOCKTIME'] = "1"
# Prevents the CPU to run into low performance state
os.environ['KMP_TPAUSE'] = "0"
# Provides fine granularity parallelism
os.environ['KMP_FORKJOIN_BARRIER_PATTERN'] = "dist,dist"
os.environ['KMP_PLAIN_BARRIER_PATTERN'] = "dist,dist"
os.environ['KMP_REDUCTION_BARRIER_PATTERN'] = "dist,dist"
# To hint IPEX uses shared memory based AllReduce
os.environ["LOCAL_WORLD_SIZE"] = str(
self.parallel_config.tensor_parallel_size)
self.model_config = _verify_and_get_model_config(self.model_config)
self.cache_config = _verify_and_get_cache_config(self.cache_config)
self.scheduler_config = _verify_and_get_scheduler_config(
self.scheduler_config)
self.parallel_config = _verify_and_get_parallel_config(
self.parallel_config)
# Multiprocessing-based executor does not support multi-node setting.
# Since it only works for single node, we can use the loopback address
# 127.0.0.1 for communication.
ip = "127.0.0.1"
port = get_open_port()
self.distributed_init_method = get_distributed_init_method(ip, port)
is_async = isinstance(self, CPUExecutorAsync)
world_size = self.parallel_config.tensor_parallel_size
result_handler = ResultHandler()
self.parallel_worker_tasks: Optional[Union[Any, Awaitable[Any]]] = None
self.workers = []
if is_async:
self.workers = [
ProcessWorkerWrapper(
result_handler,
partial(
self._create_worker,
rank=rank,
local_rank=rank,
)) for rank in range(0, world_size)
]
self.driver_worker = self.workers[0]
self.workers = self.workers[1:]
self.driver_method_invoker = _async_driver_method_invoker
else:
self.driver_worker = self._create_worker()
self.driver_method_invoker = _driver_method_invoker
if world_size != 1:
self.workers = [
ProcessWorkerWrapper(
result_handler,
partial(
self._create_worker,
rank=rank,
local_rank=rank,
)) for rank in range(1, world_size)
]
self.worker_monitor = None
if world_size != 1 or is_async:
if is_async:
async_worker_list = self.workers + [self.driver_worker]
else:
async_worker_list = self.workers
self.worker_monitor = WorkerMonitor(async_worker_list,
result_handler)
result_handler.start()
self.worker_monitor.start()
self._run_workers("init_device")
self._run_workers("load_model")
def _create_worker(
self,
local_rank: int = 0,
rank: int = 0,
):
worker_module_name = "vllm.worker.cpu_worker"
worker_class_name = "CPUWorker"
wrapper = WorkerWrapperBase(
worker_module_name=worker_module_name,
worker_class_name=worker_class_name,
)
assert self.distributed_init_method is not None
kwargs = dict(
vllm_config=self.vllm_config,
local_rank=local_rank,
rank=rank,
distributed_init_method=self.distributed_init_method,
kv_cache_dtype=self.cache_config.cache_dtype,
is_driver_worker=rank == 0,
)
wrapper.init_worker(**kwargs)
return wrapper.worker
def _run_workers(
self,
method: str,
*args,
async_run_remote_workers_only: bool = False,
max_concurrent_workers: Optional[int] = None,
**kwargs,
) -> Any:
"""Runs the given method on all workers.
Args:
async_run_remote_workers_only: If True the method will be run only
in the remote workers, not the driver worker. It will also be
run asynchronously and return a list of futures rather than
blocking on the results.
"""
if max_concurrent_workers:
raise NotImplementedError(
"max_concurrent_workers is not supported yet.")
# Start the workers first.
worker_outputs = [
worker.execute_method(method, *args, **kwargs)
for worker in self.workers
]
if async_run_remote_workers_only:
# Just return futures
return worker_outputs
driver_worker_output = self.driver_method_invoker(
self.driver_worker, method, *args, **kwargs)
# Get the results of the workers.
return [driver_worker_output
] + [output.get() for output in worker_outputs]
def determine_num_available_blocks(self) -> Tuple[int, int]:
"""Determine the number of available KV blocks by invoking the
underlying worker.
"""
return self.driver_method_invoker(self.driver_worker,
"determine_num_available_blocks")
def initialize_cache(self, num_gpu_blocks: int,
num_cpu_blocks: int) -> None:
"""Initialize the KV cache by invoking the underlying worker.
"""
# NOTE: We log here to avoid multiple logs when number of workers is
# greater than one. We could log in the engine, but not all executors
# have GPUs.
# NOTE: `cpu block` for CPU backend is located on CPU memory but is
# referred as `gpu block`. Because we want to reuse the existing block
# management procedure.
logger.info("# CPU blocks: %d", num_gpu_blocks)
self._run_workers("initialize_cache",
num_gpu_blocks=num_gpu_blocks,
num_cpu_blocks=num_cpu_blocks)
def execute_model(
self,
execute_model_req: ExecuteModelRequest) -> List[SamplerOutput]:
if (self.parallel_config.tensor_parallel_size > 1
and self.parallel_worker_tasks is None):
self.parallel_worker_tasks = self._run_workers(
"start_worker_execution_loop",
async_run_remote_workers_only=True,
)
output = self.driver_method_invoker(self.driver_worker,
"execute_model", execute_model_req)
return output
def stop_remote_worker_execution_loop(self) -> None:
if self.parallel_worker_tasks is None:
return
"""
Passing None will cause the driver to stop the model execution
loop running in each of the remote workers.
"""
self.driver_method_invoker(self.driver_worker, "execute_model", None)
parallel_worker_tasks = self.parallel_worker_tasks
self.parallel_worker_tasks = None
# Ensure that workers exit model loop cleanly
# (this will raise otherwise)
self._wait_for_tasks_completion(parallel_worker_tasks)
def add_lora(self, lora_request: LoRARequest) -> bool:
return all(self._run_workers("add_lora", lora_request))
def remove_lora(self, lora_id: int) -> bool:
return all(self._run_workers("remove_lora", lora_id))
def pin_lora(self, lora_id: int) -> bool:
assert lora_id > 0, "lora_id must be greater than 0."
return all(self._run_workers(
"pin_lora",
lora_id=lora_id,
))
def list_loras(self) -> Set[int]:
return self.driver_method_invoker(self.driver_worker, "list_loras")
def add_prompt_adapter(
self, prompt_adapter_request: PromptAdapterRequest) -> bool:
return all(
self._run_workers(
"add_prompt_adapter",
prompt_adapter_request,
))
def remove_prompt_adapter(self, prompt_adapter_id: int) -> bool:
return all(
self._run_workers(
"remove_prompt_adapter",
prompt_adapter_id,
))
def list_prompt_adapters(self) -> Set[int]:
return self.driver_method_invoker(self.driver_worker,
"list_prompt_adapters")
def pin_prompt_adapter(self, prompt_adapter_id: int) -> bool:
return all(self._run_workers(
"pin_prompt_adapter",
prompt_adapter_id,
))
def check_health(self) -> None:
"""Raises an error if engine is unhealthy."""
if self.worker_monitor is not None and not self.worker_monitor.is_alive(
):
raise RuntimeError("Worker processes are not running")
def shutdown(self):
if (worker_monitor := getattr(self, "worker_monitor",
None)) is not None:
worker_monitor.close()
def _wait_for_tasks_completion(self, parallel_worker_tasks: Any) -> None:
"""Wait for futures returned from _run_workers() with
async_run_remote_workers_only to complete."""
for result in parallel_worker_tasks:
result.get()
def start_profile(self) -> None:
self.driver_method_invoker(self.driver_worker, "start_profile")
def stop_profile(self) -> None:
self.driver_method_invoker(self.driver_worker, "stop_profile")
class CPUExecutorAsync(CPUExecutor, ExecutorAsyncBase):
async def execute_model_async(
self,
execute_model_req: ExecuteModelRequest) -> List[SamplerOutput]:
output = await make_async(self.execute_model
)(execute_model_req=execute_model_req, )
return output
async def check_health_async(self) -> None:
self.check_health()
def _verify_and_get_model_config(config: ModelConfig) -> ModelConfig:
# Reminder: Please update docs/source/serving/compatibility_matrix.rst
# If the feature combo become valid
if not config.enforce_eager:
logger.warning(
"CUDA graph is not supported on CPU, fallback to the eager "
"mode.")
config.enforce_eager = True
return config
def _verify_and_get_scheduler_config(
config: SchedulerConfig) -> SchedulerConfig:
# Reminder: Please update docs/source/serving/compatibility_matrix.rst
# If the feature combo become valid
if config.chunked_prefill_enabled:
logger.warning("Chunked prefill is not supported on CPU, disable it.")
config.chunked_prefill_enabled = False
return config
def _verify_and_get_cache_config(config: CacheConfig) -> CacheConfig:
# Reminder: Please update docs/source/serving/compatibility_matrix.rst
# If the feature combo become valid
if config.enable_prefix_caching:
logger.warning("Prefix caching is not supported on CPU, disable it.")
config.enable_prefix_caching = False
kv_cache_space = envs.VLLM_CPU_KVCACHE_SPACE
if kv_cache_space >= 0:
if kv_cache_space == 0:
config.cpu_kvcache_space_bytes = 4 * GiB_bytes # type: ignore
logger.warning("Environment variable VLLM_CPU_KVCACHE_SPACE (GB) "
"for CPU backend is not set, using 4 by default.")
else:
config.cpu_kvcache_space_bytes = kv_cache_space * GiB_bytes # type: ignore
else:
raise RuntimeError(
"Invalid environment variable VLLM_CPU_KVCACHE_SPACE"
f" {kv_cache_space}, expect a positive integer value.")
return config
def _verify_and_get_parallel_config(config: ParallelConfig) -> ParallelConfig:
if (config.distributed_executor_backend is not None
and config.distributed_executor_backend != "mp"):
logger.warning(
"%s is not supported on CPU, fallback to mp distributed executor "
"backend.", config.distributed_executor_backend)
config.distributed_executor_backend = "mp"
return config
def _driver_method_invoker(driver, method: str, *args, **kwargs):
return getattr(driver, method)(*args, **kwargs)
def _async_driver_method_invoker(driver, method: str, *args, **kwargs):
return driver.execute_method(method, *args, **kwargs).get()