Files
enginex-biren-vllm/vllm_br/v1/engine/llm_engine.py
2026-03-10 13:31:25 +08:00

144 lines
5.9 KiB
Python

################################################################################
# Copyright(c)2020-2025 Shanghai Biren Technology Co., Ltd. All rights reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
################################################################################
from typing import Optional
from fastcore.basics import patch_to
import vllm.envs as envs
from vllm.config import VllmConfig
from vllm.distributed import stateless_destroy_torch_distributed_process_group
from vllm.distributed.parallel_state import get_dp_group
from vllm.multimodal import MULTIMODAL_REGISTRY, MultiModalRegistry
from vllm.tracing import init_tracer
from vllm.transformers_utils.tokenizer import init_tokenizer_from_configs
from vllm.usage.usage_lib import UsageContext
from vllm.v1.engine.core_client import EngineCoreClient
from vllm.v1.engine.llm_engine import LLMEngine
from vllm.v1.engine.output_processor import OutputProcessor
from vllm.v1.engine.processor import Processor
from vllm.v1.executor.abstract import Executor
from vllm.v1.metrics.loggers import StatLoggerFactory, StatLoggerManager
from vllm_br import envs as envs_br
from vllm_br.utils import (create_cpu_all_reduce_shared_mem,
get_cpu_all_reduce_shared_mem)
@patch_to(LLMEngine)
def __init__(
self,
vllm_config: VllmConfig,
executor_class: type[Executor],
log_stats: bool,
usage_context: UsageContext = UsageContext.ENGINE_CONTEXT,
stat_loggers: Optional[list[StatLoggerFactory]] = None,
mm_registry: MultiModalRegistry = MULTIMODAL_REGISTRY,
use_cached_outputs: bool = False,
multiprocess_mode: bool = False,
) -> None:
if not envs.VLLM_USE_V1:
raise ValueError("Using V1 LLMEngine, but envs.VLLM_USE_V1=False. "
"This should not happen. As a workaround, try using "
"LLMEngine.from_vllm_config(...) or explicitly set "
"VLLM_USE_V1=0 or 1 and report this issue on Github.")
if stat_loggers is not None:
raise NotImplementedError(
"Passing StatLoggers to LLMEngine in V1 is not yet supported. "
"Set VLLM_USE_V1=0 and file and issue on Github.")
if envs_br.VLLM_BR_USE_CPU_ALL_REDUCE != 0:
create_cpu_all_reduce_shared_mem()
self.vllm_config = vllm_config
self.observability_config = vllm_config.observability_config
self.model_config = vllm_config.model_config
self.cache_config = vllm_config.cache_config
self.log_stats = log_stats
executor_backend = (
self.vllm_config.parallel_config.distributed_executor_backend)
parallel_config = vllm_config.parallel_config
self.external_launcher_dp = (parallel_config.data_parallel_size > 1
and executor_backend == "external_launcher")
# important: init dp group before init the engine_core
# In the decoupled engine case this is handled in EngineCoreProc.
if not multiprocess_mode and parallel_config.data_parallel_size > 1 \
and not self.external_launcher_dp:
self.dp_group = parallel_config.stateless_init_dp_group()
else:
self.dp_group = None
self.should_execute_dummy_batch = False
if self.model_config.skip_tokenizer_init:
self.tokenizer = None
else:
# Tokenizer (+ ensure liveness if running in another process).
self.tokenizer = init_tokenizer_from_configs(
model_config=vllm_config.model_config)
# Processor (convert Inputs --> EngineCoreRequests)
self.processor = Processor(vllm_config=vllm_config,
tokenizer=self.tokenizer,
mm_registry=mm_registry)
# OutputProcessor (convert EngineCoreOutputs --> RequestOutput).
self.output_processor = OutputProcessor(self.tokenizer,
log_stats=self.log_stats)
if self.observability_config.otlp_traces_endpoint is not None:
tracer = init_tracer("vllm.llm_engine",
self.observability_config.otlp_traces_endpoint)
self.output_processor.tracer = tracer
# EngineCore (gets EngineCoreRequests and gives EngineCoreOutputs)
self.engine_core = EngineCoreClient.make_client(
multiprocess_mode=multiprocess_mode,
asyncio_mode=False,
vllm_config=vllm_config,
executor_class=executor_class,
log_stats=self.log_stats,
)
self.logger_manager: Optional[StatLoggerManager] = None # type: ignore
if self.log_stats:
self.logger_manager = StatLoggerManager(
vllm_config=vllm_config,
custom_stat_loggers=stat_loggers,
enable_default_loggers=log_stats,
)
self.logger_manager.log_engine_initialized()
if not multiprocess_mode:
# for v0 compatibility
self.model_executor = self.engine_core.engine_core.model_executor # type: ignore
if self.external_launcher_dp:
# If we use DP in external launcher mode, we reuse the
# existing DP group used for data communication.
self.dp_group = get_dp_group().cpu_group
# Don't keep the dummy data in memory
self.reset_mm_cache()
@patch_to(LLMEngine)
def __del__(self):
if dp_group := getattr(self, "dp_group",
None) and not self.external_launcher_dp:
stateless_destroy_torch_distributed_process_group(dp_group)
if get_cpu_all_reduce_shared_mem() is not None:
get_cpu_all_reduce_shared_mem()._cleanup()