Files
2026-04-02 04:55:00 +00:00

209 lines
9.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import queue
import signal
import sys
import threading
import time
from collections import deque
from collections.abc import Generator
from concurrent.futures import Future
from contextlib import ExitStack, contextmanager
from inspect import isclass, signature
from logging import DEBUG
from typing import Any, Callable, Optional, TypeVar, Union
import msgspec
import zmq
from vllm.config import ParallelConfig, VllmConfig
from vllm.logger import init_logger
from vllm.utils import make_zmq_socket
from vllm.v1.kv_cache_interface import KVCacheConfig
from vllm.v1.engine import (EngineCoreRequest,EngineCoreRequestType)
from vllm.v1.core.kv_cache_utils import (BlockHash,
generate_scheduler_kv_cache_config,
get_kv_cache_configs,
get_request_block_hasher,
init_none_hash)
from vllm.v1.serial_utils import MsgpackDecoder
from vllm.v1.engine.core import EngineCore
from vllm.v1.request import Request, RequestStatus
logger = init_logger(__name__)
POLLING_TIMEOUT_S = 2.5
HANDSHAKE_TIMEOUT_MINS = 5
_R = TypeVar('_R') # Return type for collective_rpc
class EngineCoreProc(EngineCore):
"""ZMQ-wrapper for running EngineCore in background process."""
ENGINE_CORE_DEAD = b'ENGINE_CORE_DEAD'
def process_input_sockets(self, input_addresses: list[str],
coord_input_address: Optional[str],
identity: bytes, ready_event: threading.Event):
"""Input socket IO thread."""
# Msgpack serialization decoding.
add_request_decoder = MsgpackDecoder(EngineCoreRequest)
generic_decoder = MsgpackDecoder()
bulk_add_decoder = MsgpackDecoder(list[EngineCoreRequest])
with ExitStack() as stack, zmq.Context() as ctx:
input_sockets = [
stack.enter_context(
make_zmq_socket(ctx,
input_address,
zmq.DEALER,
identity=identity,
bind=False))
for input_address in input_addresses
]
if coord_input_address is None:
coord_socket = None
else:
coord_socket = stack.enter_context(
make_zmq_socket(ctx,
coord_input_address,
zmq.XSUB,
identity=identity,
bind=False))
# Send subscription message to coordinator.
coord_socket.send(b'\x01')
# Register sockets with poller.
poller = zmq.Poller()
for input_socket in input_sockets:
# Send initial message to each input socket - this is required
# before the front-end ROUTER socket can send input messages
# back to us.
input_socket.send(b'')
poller.register(input_socket, zmq.POLLIN)
if coord_socket is not None:
poller.register(coord_socket, zmq.POLLIN)
ready_event.set()
del ready_event
while True:
for input_socket, _ in poller.poll():
# (RequestType, RequestData)
type_frame, *data_frames = input_socket.recv_multipart(
copy=False)
request_type = EngineCoreRequestType(
bytes(type_frame.buffer))
if request_type == EngineCoreRequestType.ADD_BULK:
# 关键:按 list[EngineCoreRequest] 解码,然后在接收线程就地 fan-out
requests = bulk_add_decoder.decode(data_frames)
for r in requests:
r = self.preprocess_add_request(r)
self.input_queue.put_nowait((EngineCoreRequestType.ADD, r))
continue
# Deserialize the request data.
if request_type == EngineCoreRequestType.ADD:
request = add_request_decoder.decode(data_frames)
request = self.preprocess_add_request(request)
else:
request = generic_decoder.decode(data_frames)
# Push to input queue for core busy loop.
self.input_queue.put_nowait((request_type, request))
class EngineCore:
"""Inner loop of vLLM's Engine."""
def _initialize_kv_caches(
self, vllm_config: VllmConfig) -> tuple[int, int, KVCacheConfig]:
start = time.time()
# Get all kv cache needed by the model
kv_cache_specs = self.model_executor.get_kv_cache_specs()
# get_kv_cache_specs in model_runner
# for layer_name, layer in vllm_config.compilation_config.static_forward_context.items():
# print(f'layer_name = {layer_name}; layer = {layer}')
# # 只有 moe layer 拿不到attention layer?
# # TODO for no kv cahe model
# has_kv_cache = any(kv_cache_spec for kv_cache_spec in kv_cache_specs)
# if has_kv_cache:
# if os.environ.get("VLLM_ELASTIC_EP_SCALE_UP_LAUNCH") == "1":
# dp_group = getattr(self, "dp_group", None)
# assert dp_group is not None
# self.available_gpu_memory_for_kv_cache = \
# ParallelConfig.sync_kv_cache_memory_size(dp_group, -1)
# available_gpu_memory = [
# self.available_gpu_memory_for_kv_cache
# ] * len(kv_cache_specs)
# else:
# # Profiles the peak memory usage of the model to determine how
# # much memory can be allocated for kv cache.
# available_gpu_memory = (
# self.model_executor.determine_available_memory())
# self.available_gpu_memory_for_kv_cache = \
# available_gpu_memory[0]
# else:
# # Attention free models don't need memory for kv cache
# available_gpu_memory = [0] * len(kv_cache_specs)
memory_blocks = self.model_executor.determine_available_memory_block() # [(memory, blocks) * rank_number]
available_gpu_memory = [memory_block[0] for memory_block in memory_blocks]
num_gpu_blocks = memory_blocks[0][1]
assert len(kv_cache_specs) == len(available_gpu_memory)
kv_cache_configs = get_kv_cache_configs(vllm_config, kv_cache_specs,
available_gpu_memory)
### patch here to support long seq_length for mtp
for kv_cache_config in kv_cache_configs:
for ii in range(len(kv_cache_config.kv_cache_tensors)):
kv_cache_config.kv_cache_tensors[ii].size = kv_cache_config.kv_cache_tensors[ii].size * num_gpu_blocks // kv_cache_config.num_blocks
kv_cache_config.num_blocks = num_gpu_blocks
### patch here to support long seq_length for mtp end
scheduler_kv_cache_config = generate_scheduler_kv_cache_config(
kv_cache_configs)
num_gpu_blocks = scheduler_kv_cache_config.num_blocks
num_cpu_blocks = 0
# Initialize kv cache and warmup the execution
self.model_executor.initialize_from_config(kv_cache_configs)
elapsed = time.time() - start
logger.info(("init engine (profile, create kv cache, "
"warmup model) took %.2f seconds"), elapsed)
return num_gpu_blocks, num_cpu_blocks, scheduler_kv_cache_config
def preprocess_add_request(
self, request: EngineCoreRequest) -> tuple[Request, int]:
"""Preprocess the request.
This function could be directly used in input processing thread to allow
request initialization running in parallel with Model forward
"""
# Note on thread safety: no race condition.
# `mm_receiver_cache` is reset at the end of LLMEngine init,
# and will only be accessed in the input processing thread afterwards.
if self.mm_receiver_cache is not None and request.mm_features:
request.mm_features = (
self.mm_receiver_cache.get_and_update_features(
request.mm_features))
req = Request.from_engine_core_request(request,
self.request_block_hasher)
if req.use_structured_output:
# Note on thread safety: no race condition.
# `grammar_init` is only invoked in input processing thread. For
# `structured_output_manager`, each request is independent and
# grammar compilation is async. Scheduler always checks grammar
# compilation status before scheduling request.
self.structured_output_manager.grammar_init(req)
return req, request.current_wave