################################################################################ # Copyright(c)2020-2025 Shanghai Biren Technology Co., Ltd. All rights reserved. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ################################################################################ from typing import Optional from fastcore.basics import patch_to import vllm.envs as envs from vllm.config import VllmConfig from vllm.distributed import stateless_destroy_torch_distributed_process_group from vllm.distributed.parallel_state import get_dp_group from vllm.multimodal import MULTIMODAL_REGISTRY, MultiModalRegistry from vllm.tracing import init_tracer from vllm.transformers_utils.tokenizer import init_tokenizer_from_configs from vllm.usage.usage_lib import UsageContext from vllm.v1.engine.core_client import EngineCoreClient from vllm.v1.engine.llm_engine import LLMEngine from vllm.v1.engine.output_processor import OutputProcessor from vllm.v1.engine.processor import Processor from vllm.v1.executor.abstract import Executor from vllm.v1.metrics.loggers import StatLoggerFactory, StatLoggerManager from vllm_br import envs as envs_br from vllm_br.utils import (create_cpu_all_reduce_shared_mem, get_cpu_all_reduce_shared_mem) @patch_to(LLMEngine) def __init__( self, vllm_config: VllmConfig, executor_class: type[Executor], log_stats: bool, usage_context: UsageContext = UsageContext.ENGINE_CONTEXT, stat_loggers: Optional[list[StatLoggerFactory]] = None, mm_registry: MultiModalRegistry = MULTIMODAL_REGISTRY, use_cached_outputs: bool = False, multiprocess_mode: bool = False, ) -> None: if not envs.VLLM_USE_V1: raise ValueError("Using V1 LLMEngine, but envs.VLLM_USE_V1=False. " "This should not happen. As a workaround, try using " "LLMEngine.from_vllm_config(...) or explicitly set " "VLLM_USE_V1=0 or 1 and report this issue on Github.") if stat_loggers is not None: raise NotImplementedError( "Passing StatLoggers to LLMEngine in V1 is not yet supported. " "Set VLLM_USE_V1=0 and file and issue on Github.") if envs_br.VLLM_BR_USE_CPU_ALL_REDUCE != 0: create_cpu_all_reduce_shared_mem() self.vllm_config = vllm_config self.observability_config = vllm_config.observability_config self.model_config = vllm_config.model_config self.cache_config = vllm_config.cache_config self.log_stats = log_stats executor_backend = ( self.vllm_config.parallel_config.distributed_executor_backend) parallel_config = vllm_config.parallel_config self.external_launcher_dp = (parallel_config.data_parallel_size > 1 and executor_backend == "external_launcher") # important: init dp group before init the engine_core # In the decoupled engine case this is handled in EngineCoreProc. if not multiprocess_mode and parallel_config.data_parallel_size > 1 \ and not self.external_launcher_dp: self.dp_group = parallel_config.stateless_init_dp_group() else: self.dp_group = None self.should_execute_dummy_batch = False if self.model_config.skip_tokenizer_init: self.tokenizer = None else: # Tokenizer (+ ensure liveness if running in another process). self.tokenizer = init_tokenizer_from_configs( model_config=vllm_config.model_config) # Processor (convert Inputs --> EngineCoreRequests) self.processor = Processor(vllm_config=vllm_config, tokenizer=self.tokenizer, mm_registry=mm_registry) # OutputProcessor (convert EngineCoreOutputs --> RequestOutput). self.output_processor = OutputProcessor(self.tokenizer, log_stats=self.log_stats) if self.observability_config.otlp_traces_endpoint is not None: tracer = init_tracer("vllm.llm_engine", self.observability_config.otlp_traces_endpoint) self.output_processor.tracer = tracer # EngineCore (gets EngineCoreRequests and gives EngineCoreOutputs) self.engine_core = EngineCoreClient.make_client( multiprocess_mode=multiprocess_mode, asyncio_mode=False, vllm_config=vllm_config, executor_class=executor_class, log_stats=self.log_stats, ) self.logger_manager: Optional[StatLoggerManager] = None # type: ignore if self.log_stats: self.logger_manager = StatLoggerManager( vllm_config=vllm_config, custom_stat_loggers=stat_loggers, enable_default_loggers=log_stats, ) self.logger_manager.log_engine_initialized() if not multiprocess_mode: # for v0 compatibility self.model_executor = self.engine_core.engine_core.model_executor # type: ignore if self.external_launcher_dp: # If we use DP in external launcher mode, we reuse the # existing DP group used for data communication. self.dp_group = get_dp_group().cpu_group # Don't keep the dummy data in memory self.reset_mm_cache() @patch_to(LLMEngine) def __del__(self): if dp_group := getattr(self, "dp_group", None) and not self.external_launcher_dp: stateless_destroy_torch_distributed_process_group(dp_group) if get_cpu_all_reduce_shared_mem() is not None: get_cpu_all_reduce_shared_mem()._cleanup()