import os import queue import signal import sys import threading import time from collections import deque from collections.abc import Generator from concurrent.futures import Future from contextlib import ExitStack, contextmanager from inspect import isclass, signature from logging import DEBUG from typing import Any, Callable, Optional, TypeVar, Union import msgspec import zmq from vllm.config import ParallelConfig, VllmConfig from vllm.logger import init_logger from vllm.utils import make_zmq_socket from vllm.v1.kv_cache_interface import KVCacheConfig from vllm.v1.engine import (EngineCoreRequest,EngineCoreRequestType) from vllm.v1.core.kv_cache_utils import (BlockHash, generate_scheduler_kv_cache_config, get_kv_cache_configs, get_request_block_hasher, init_none_hash) from vllm.v1.serial_utils import MsgpackDecoder from vllm.v1.engine.core import EngineCore from vllm.v1.request import Request, RequestStatus logger = init_logger(__name__) POLLING_TIMEOUT_S = 2.5 HANDSHAKE_TIMEOUT_MINS = 5 _R = TypeVar('_R') # Return type for collective_rpc class EngineCoreProc(EngineCore): """ZMQ-wrapper for running EngineCore in background process.""" ENGINE_CORE_DEAD = b'ENGINE_CORE_DEAD' def process_input_sockets(self, input_addresses: list[str], coord_input_address: Optional[str], identity: bytes, ready_event: threading.Event): """Input socket IO thread.""" # Msgpack serialization decoding. add_request_decoder = MsgpackDecoder(EngineCoreRequest) generic_decoder = MsgpackDecoder() bulk_add_decoder = MsgpackDecoder(list[EngineCoreRequest]) with ExitStack() as stack, zmq.Context() as ctx: input_sockets = [ stack.enter_context( make_zmq_socket(ctx, input_address, zmq.DEALER, identity=identity, bind=False)) for input_address in input_addresses ] if coord_input_address is None: coord_socket = None else: coord_socket = stack.enter_context( make_zmq_socket(ctx, coord_input_address, zmq.XSUB, identity=identity, bind=False)) # Send subscription message to coordinator. coord_socket.send(b'\x01') # Register sockets with poller. poller = zmq.Poller() for input_socket in input_sockets: # Send initial message to each input socket - this is required # before the front-end ROUTER socket can send input messages # back to us. input_socket.send(b'') poller.register(input_socket, zmq.POLLIN) if coord_socket is not None: poller.register(coord_socket, zmq.POLLIN) ready_event.set() del ready_event while True: for input_socket, _ in poller.poll(): # (RequestType, RequestData) type_frame, *data_frames = input_socket.recv_multipart( copy=False) request_type = EngineCoreRequestType( bytes(type_frame.buffer)) if request_type == EngineCoreRequestType.ADD_BULK: # 关键:按 list[EngineCoreRequest] 解码,然后在接收线程就地 fan-out requests = bulk_add_decoder.decode(data_frames) for r in requests: r = self.preprocess_add_request(r) self.input_queue.put_nowait((EngineCoreRequestType.ADD, r)) continue # Deserialize the request data. if request_type == EngineCoreRequestType.ADD: request = add_request_decoder.decode(data_frames) request = self.preprocess_add_request(request) else: request = generic_decoder.decode(data_frames) # Push to input queue for core busy loop. self.input_queue.put_nowait((request_type, request)) class EngineCore: """Inner loop of vLLM's Engine.""" def _initialize_kv_caches( self, vllm_config: VllmConfig) -> tuple[int, int, KVCacheConfig]: start = time.time() # Get all kv cache needed by the model kv_cache_specs = self.model_executor.get_kv_cache_specs() # get_kv_cache_specs in model_runner # for layer_name, layer in vllm_config.compilation_config.static_forward_context.items(): # print(f'layer_name = {layer_name}; layer = {layer}') # # 只有 moe layer, 拿不到attention layer? # # TODO for no kv cahe model # has_kv_cache = any(kv_cache_spec for kv_cache_spec in kv_cache_specs) # if has_kv_cache: # if os.environ.get("VLLM_ELASTIC_EP_SCALE_UP_LAUNCH") == "1": # dp_group = getattr(self, "dp_group", None) # assert dp_group is not None # self.available_gpu_memory_for_kv_cache = \ # ParallelConfig.sync_kv_cache_memory_size(dp_group, -1) # available_gpu_memory = [ # self.available_gpu_memory_for_kv_cache # ] * len(kv_cache_specs) # else: # # Profiles the peak memory usage of the model to determine how # # much memory can be allocated for kv cache. # available_gpu_memory = ( # self.model_executor.determine_available_memory()) # self.available_gpu_memory_for_kv_cache = \ # available_gpu_memory[0] # else: # # Attention free models don't need memory for kv cache # available_gpu_memory = [0] * len(kv_cache_specs) memory_blocks = self.model_executor.determine_available_memory_block() # [(memory, blocks) * rank_number] available_gpu_memory = [memory_block[0] for memory_block in memory_blocks] num_gpu_blocks = memory_blocks[0][1] assert len(kv_cache_specs) == len(available_gpu_memory) kv_cache_configs = get_kv_cache_configs(vllm_config, kv_cache_specs, available_gpu_memory) ### patch here to support long seq_length for mtp for kv_cache_config in kv_cache_configs: for ii in range(len(kv_cache_config.kv_cache_tensors)): kv_cache_config.kv_cache_tensors[ii].size = kv_cache_config.kv_cache_tensors[ii].size * num_gpu_blocks // kv_cache_config.num_blocks kv_cache_config.num_blocks = num_gpu_blocks ### patch here to support long seq_length for mtp end scheduler_kv_cache_config = generate_scheduler_kv_cache_config( kv_cache_configs) num_gpu_blocks = scheduler_kv_cache_config.num_blocks num_cpu_blocks = 0 # Initialize kv cache and warmup the execution self.model_executor.initialize_from_config(kv_cache_configs) elapsed = time.time() - start logger.info(("init engine (profile, create kv cache, " "warmup model) took %.2f seconds"), elapsed) return num_gpu_blocks, num_cpu_blocks, scheduler_kv_cache_config def preprocess_add_request( self, request: EngineCoreRequest) -> tuple[Request, int]: """Preprocess the request. This function could be directly used in input processing thread to allow request initialization running in parallel with Model forward """ # Note on thread safety: no race condition. # `mm_receiver_cache` is reset at the end of LLMEngine init, # and will only be accessed in the input processing thread afterwards. if self.mm_receiver_cache is not None and request.mm_features: request.mm_features = ( self.mm_receiver_cache.get_and_update_features( request.mm_features)) req = Request.from_engine_core_request(request, self.request_block_hasher) if req.use_structured_output: # Note on thread safety: no race condition. # `grammar_init` is only invoked in input processing thread. For # `structured_output_manager`, each request is independent and # grammar compilation is async. Scheduler always checks grammar # compilation status before scheduling request. self.structured_output_manager.grammar_init(req) return req, request.current_wave