209 lines
9.1 KiB
Python
209 lines
9.1 KiB
Python
import os
|
||
import queue
|
||
import signal
|
||
import sys
|
||
import threading
|
||
import time
|
||
from collections import deque
|
||
from collections.abc import Generator
|
||
from concurrent.futures import Future
|
||
from contextlib import ExitStack, contextmanager
|
||
from inspect import isclass, signature
|
||
from logging import DEBUG
|
||
from typing import Any, Callable, Optional, TypeVar, Union
|
||
|
||
import msgspec
|
||
import zmq
|
||
|
||
from vllm.config import ParallelConfig, VllmConfig
|
||
from vllm.logger import init_logger
|
||
from vllm.utils import make_zmq_socket
|
||
|
||
from vllm.v1.kv_cache_interface import KVCacheConfig
|
||
from vllm.v1.engine import (EngineCoreRequest,EngineCoreRequestType)
|
||
from vllm.v1.core.kv_cache_utils import (BlockHash,
|
||
generate_scheduler_kv_cache_config,
|
||
get_kv_cache_configs,
|
||
get_request_block_hasher,
|
||
init_none_hash)
|
||
from vllm.v1.serial_utils import MsgpackDecoder
|
||
from vllm.v1.engine.core import EngineCore
|
||
from vllm.v1.request import Request, RequestStatus
|
||
|
||
logger = init_logger(__name__)
|
||
|
||
POLLING_TIMEOUT_S = 2.5
|
||
HANDSHAKE_TIMEOUT_MINS = 5
|
||
|
||
_R = TypeVar('_R') # Return type for collective_rpc
|
||
|
||
class EngineCoreProc(EngineCore):
|
||
"""ZMQ-wrapper for running EngineCore in background process."""
|
||
|
||
ENGINE_CORE_DEAD = b'ENGINE_CORE_DEAD'
|
||
def process_input_sockets(self, input_addresses: list[str],
|
||
coord_input_address: Optional[str],
|
||
identity: bytes, ready_event: threading.Event):
|
||
"""Input socket IO thread."""
|
||
|
||
# Msgpack serialization decoding.
|
||
add_request_decoder = MsgpackDecoder(EngineCoreRequest)
|
||
generic_decoder = MsgpackDecoder()
|
||
bulk_add_decoder = MsgpackDecoder(list[EngineCoreRequest])
|
||
|
||
with ExitStack() as stack, zmq.Context() as ctx:
|
||
input_sockets = [
|
||
stack.enter_context(
|
||
make_zmq_socket(ctx,
|
||
input_address,
|
||
zmq.DEALER,
|
||
identity=identity,
|
||
bind=False))
|
||
for input_address in input_addresses
|
||
]
|
||
if coord_input_address is None:
|
||
coord_socket = None
|
||
else:
|
||
coord_socket = stack.enter_context(
|
||
make_zmq_socket(ctx,
|
||
coord_input_address,
|
||
zmq.XSUB,
|
||
identity=identity,
|
||
bind=False))
|
||
# Send subscription message to coordinator.
|
||
coord_socket.send(b'\x01')
|
||
|
||
# Register sockets with poller.
|
||
poller = zmq.Poller()
|
||
for input_socket in input_sockets:
|
||
# Send initial message to each input socket - this is required
|
||
# before the front-end ROUTER socket can send input messages
|
||
# back to us.
|
||
input_socket.send(b'')
|
||
poller.register(input_socket, zmq.POLLIN)
|
||
|
||
if coord_socket is not None:
|
||
poller.register(coord_socket, zmq.POLLIN)
|
||
|
||
ready_event.set()
|
||
del ready_event
|
||
while True:
|
||
for input_socket, _ in poller.poll():
|
||
# (RequestType, RequestData)
|
||
type_frame, *data_frames = input_socket.recv_multipart(
|
||
copy=False)
|
||
request_type = EngineCoreRequestType(
|
||
bytes(type_frame.buffer))
|
||
|
||
if request_type == EngineCoreRequestType.ADD_BULK:
|
||
# 关键:按 list[EngineCoreRequest] 解码,然后在接收线程就地 fan-out
|
||
requests = bulk_add_decoder.decode(data_frames)
|
||
for r in requests:
|
||
r = self.preprocess_add_request(r)
|
||
self.input_queue.put_nowait((EngineCoreRequestType.ADD, r))
|
||
continue
|
||
|
||
# Deserialize the request data.
|
||
if request_type == EngineCoreRequestType.ADD:
|
||
request = add_request_decoder.decode(data_frames)
|
||
request = self.preprocess_add_request(request)
|
||
else:
|
||
request = generic_decoder.decode(data_frames)
|
||
|
||
# Push to input queue for core busy loop.
|
||
self.input_queue.put_nowait((request_type, request))
|
||
|
||
|
||
class EngineCore:
|
||
"""Inner loop of vLLM's Engine."""
|
||
|
||
def _initialize_kv_caches(
|
||
self, vllm_config: VllmConfig) -> tuple[int, int, KVCacheConfig]:
|
||
start = time.time()
|
||
|
||
# Get all kv cache needed by the model
|
||
kv_cache_specs = self.model_executor.get_kv_cache_specs()
|
||
|
||
# get_kv_cache_specs in model_runner
|
||
# for layer_name, layer in vllm_config.compilation_config.static_forward_context.items():
|
||
# print(f'layer_name = {layer_name}; layer = {layer}')
|
||
# # 只有 moe layer, 拿不到attention layer?
|
||
# # TODO for no kv cahe model
|
||
|
||
|
||
# has_kv_cache = any(kv_cache_spec for kv_cache_spec in kv_cache_specs)
|
||
|
||
# if has_kv_cache:
|
||
# if os.environ.get("VLLM_ELASTIC_EP_SCALE_UP_LAUNCH") == "1":
|
||
# dp_group = getattr(self, "dp_group", None)
|
||
# assert dp_group is not None
|
||
# self.available_gpu_memory_for_kv_cache = \
|
||
# ParallelConfig.sync_kv_cache_memory_size(dp_group, -1)
|
||
# available_gpu_memory = [
|
||
# self.available_gpu_memory_for_kv_cache
|
||
# ] * len(kv_cache_specs)
|
||
# else:
|
||
# # Profiles the peak memory usage of the model to determine how
|
||
# # much memory can be allocated for kv cache.
|
||
# available_gpu_memory = (
|
||
# self.model_executor.determine_available_memory())
|
||
# self.available_gpu_memory_for_kv_cache = \
|
||
# available_gpu_memory[0]
|
||
# else:
|
||
# # Attention free models don't need memory for kv cache
|
||
# available_gpu_memory = [0] * len(kv_cache_specs)
|
||
|
||
memory_blocks = self.model_executor.determine_available_memory_block() # [(memory, blocks) * rank_number]
|
||
available_gpu_memory = [memory_block[0] for memory_block in memory_blocks]
|
||
num_gpu_blocks = memory_blocks[0][1]
|
||
|
||
assert len(kv_cache_specs) == len(available_gpu_memory)
|
||
|
||
kv_cache_configs = get_kv_cache_configs(vllm_config, kv_cache_specs,
|
||
available_gpu_memory)
|
||
|
||
### patch here to support long seq_length for mtp
|
||
for kv_cache_config in kv_cache_configs:
|
||
for ii in range(len(kv_cache_config.kv_cache_tensors)):
|
||
kv_cache_config.kv_cache_tensors[ii].size = kv_cache_config.kv_cache_tensors[ii].size * num_gpu_blocks // kv_cache_config.num_blocks
|
||
|
||
kv_cache_config.num_blocks = num_gpu_blocks
|
||
### patch here to support long seq_length for mtp end
|
||
scheduler_kv_cache_config = generate_scheduler_kv_cache_config(
|
||
kv_cache_configs)
|
||
num_gpu_blocks = scheduler_kv_cache_config.num_blocks
|
||
num_cpu_blocks = 0
|
||
|
||
# Initialize kv cache and warmup the execution
|
||
self.model_executor.initialize_from_config(kv_cache_configs)
|
||
|
||
elapsed = time.time() - start
|
||
logger.info(("init engine (profile, create kv cache, "
|
||
"warmup model) took %.2f seconds"), elapsed)
|
||
return num_gpu_blocks, num_cpu_blocks, scheduler_kv_cache_config
|
||
|
||
def preprocess_add_request(
|
||
self, request: EngineCoreRequest) -> tuple[Request, int]:
|
||
"""Preprocess the request.
|
||
|
||
This function could be directly used in input processing thread to allow
|
||
request initialization running in parallel with Model forward
|
||
"""
|
||
# Note on thread safety: no race condition.
|
||
# `mm_receiver_cache` is reset at the end of LLMEngine init,
|
||
# and will only be accessed in the input processing thread afterwards.
|
||
if self.mm_receiver_cache is not None and request.mm_features:
|
||
request.mm_features = (
|
||
self.mm_receiver_cache.get_and_update_features(
|
||
request.mm_features))
|
||
|
||
req = Request.from_engine_core_request(request,
|
||
self.request_block_hasher)
|
||
if req.use_structured_output:
|
||
# Note on thread safety: no race condition.
|
||
# `grammar_init` is only invoked in input processing thread. For
|
||
# `structured_output_manager`, each request is independent and
|
||
# grammar compilation is async. Scheduler always checks grammar
|
||
# compilation status before scheduling request.
|
||
self.structured_output_manager.grammar_init(req)
|
||
return req, request.current_wave |