209 lines
9.1 KiB
Python
209 lines
9.1 KiB
Python
|
|
import os
|
|||
|
|
import queue
|
|||
|
|
import signal
|
|||
|
|
import sys
|
|||
|
|
import threading
|
|||
|
|
import time
|
|||
|
|
from collections import deque
|
|||
|
|
from collections.abc import Generator
|
|||
|
|
from concurrent.futures import Future
|
|||
|
|
from contextlib import ExitStack, contextmanager
|
|||
|
|
from inspect import isclass, signature
|
|||
|
|
from logging import DEBUG
|
|||
|
|
from typing import Any, Callable, Optional, TypeVar, Union
|
|||
|
|
|
|||
|
|
import msgspec
|
|||
|
|
import zmq
|
|||
|
|
|
|||
|
|
from vllm.config import ParallelConfig, VllmConfig
|
|||
|
|
from vllm.logger import init_logger
|
|||
|
|
from vllm.utils import make_zmq_socket
|
|||
|
|
|
|||
|
|
from vllm.v1.kv_cache_interface import KVCacheConfig
|
|||
|
|
from vllm.v1.engine import (EngineCoreRequest,EngineCoreRequestType)
|
|||
|
|
from vllm.v1.core.kv_cache_utils import (BlockHash,
|
|||
|
|
generate_scheduler_kv_cache_config,
|
|||
|
|
get_kv_cache_configs,
|
|||
|
|
get_request_block_hasher,
|
|||
|
|
init_none_hash)
|
|||
|
|
from vllm.v1.serial_utils import MsgpackDecoder
|
|||
|
|
from vllm.v1.engine.core import EngineCore
|
|||
|
|
from vllm.v1.request import Request, RequestStatus
|
|||
|
|
|
|||
|
|
logger = init_logger(__name__)
|
|||
|
|
|
|||
|
|
POLLING_TIMEOUT_S = 2.5
|
|||
|
|
HANDSHAKE_TIMEOUT_MINS = 5
|
|||
|
|
|
|||
|
|
_R = TypeVar('_R') # Return type for collective_rpc
|
|||
|
|
|
|||
|
|
class EngineCoreProc(EngineCore):
|
|||
|
|
"""ZMQ-wrapper for running EngineCore in background process."""
|
|||
|
|
|
|||
|
|
ENGINE_CORE_DEAD = b'ENGINE_CORE_DEAD'
|
|||
|
|
def process_input_sockets(self, input_addresses: list[str],
|
|||
|
|
coord_input_address: Optional[str],
|
|||
|
|
identity: bytes, ready_event: threading.Event):
|
|||
|
|
"""Input socket IO thread."""
|
|||
|
|
|
|||
|
|
# Msgpack serialization decoding.
|
|||
|
|
add_request_decoder = MsgpackDecoder(EngineCoreRequest)
|
|||
|
|
generic_decoder = MsgpackDecoder()
|
|||
|
|
bulk_add_decoder = MsgpackDecoder(list[EngineCoreRequest])
|
|||
|
|
|
|||
|
|
with ExitStack() as stack, zmq.Context() as ctx:
|
|||
|
|
input_sockets = [
|
|||
|
|
stack.enter_context(
|
|||
|
|
make_zmq_socket(ctx,
|
|||
|
|
input_address,
|
|||
|
|
zmq.DEALER,
|
|||
|
|
identity=identity,
|
|||
|
|
bind=False))
|
|||
|
|
for input_address in input_addresses
|
|||
|
|
]
|
|||
|
|
if coord_input_address is None:
|
|||
|
|
coord_socket = None
|
|||
|
|
else:
|
|||
|
|
coord_socket = stack.enter_context(
|
|||
|
|
make_zmq_socket(ctx,
|
|||
|
|
coord_input_address,
|
|||
|
|
zmq.XSUB,
|
|||
|
|
identity=identity,
|
|||
|
|
bind=False))
|
|||
|
|
# Send subscription message to coordinator.
|
|||
|
|
coord_socket.send(b'\x01')
|
|||
|
|
|
|||
|
|
# Register sockets with poller.
|
|||
|
|
poller = zmq.Poller()
|
|||
|
|
for input_socket in input_sockets:
|
|||
|
|
# Send initial message to each input socket - this is required
|
|||
|
|
# before the front-end ROUTER socket can send input messages
|
|||
|
|
# back to us.
|
|||
|
|
input_socket.send(b'')
|
|||
|
|
poller.register(input_socket, zmq.POLLIN)
|
|||
|
|
|
|||
|
|
if coord_socket is not None:
|
|||
|
|
poller.register(coord_socket, zmq.POLLIN)
|
|||
|
|
|
|||
|
|
ready_event.set()
|
|||
|
|
del ready_event
|
|||
|
|
while True:
|
|||
|
|
for input_socket, _ in poller.poll():
|
|||
|
|
# (RequestType, RequestData)
|
|||
|
|
type_frame, *data_frames = input_socket.recv_multipart(
|
|||
|
|
copy=False)
|
|||
|
|
request_type = EngineCoreRequestType(
|
|||
|
|
bytes(type_frame.buffer))
|
|||
|
|
|
|||
|
|
if request_type == EngineCoreRequestType.ADD_BULK:
|
|||
|
|
# 关键:按 list[EngineCoreRequest] 解码,然后在接收线程就地 fan-out
|
|||
|
|
requests = bulk_add_decoder.decode(data_frames)
|
|||
|
|
for r in requests:
|
|||
|
|
r = self.preprocess_add_request(r)
|
|||
|
|
self.input_queue.put_nowait((EngineCoreRequestType.ADD, r))
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
# Deserialize the request data.
|
|||
|
|
if request_type == EngineCoreRequestType.ADD:
|
|||
|
|
request = add_request_decoder.decode(data_frames)
|
|||
|
|
request = self.preprocess_add_request(request)
|
|||
|
|
else:
|
|||
|
|
request = generic_decoder.decode(data_frames)
|
|||
|
|
|
|||
|
|
# Push to input queue for core busy loop.
|
|||
|
|
self.input_queue.put_nowait((request_type, request))
|
|||
|
|
|
|||
|
|
|
|||
|
|
class EngineCore:
|
|||
|
|
"""Inner loop of vLLM's Engine."""
|
|||
|
|
|
|||
|
|
def _initialize_kv_caches(
|
|||
|
|
self, vllm_config: VllmConfig) -> tuple[int, int, KVCacheConfig]:
|
|||
|
|
start = time.time()
|
|||
|
|
|
|||
|
|
# Get all kv cache needed by the model
|
|||
|
|
kv_cache_specs = self.model_executor.get_kv_cache_specs()
|
|||
|
|
|
|||
|
|
# get_kv_cache_specs in model_runner
|
|||
|
|
# for layer_name, layer in vllm_config.compilation_config.static_forward_context.items():
|
|||
|
|
# print(f'layer_name = {layer_name}; layer = {layer}')
|
|||
|
|
# # 只有 moe layer, 拿不到attention layer?
|
|||
|
|
# # TODO for no kv cahe model
|
|||
|
|
|
|||
|
|
|
|||
|
|
# has_kv_cache = any(kv_cache_spec for kv_cache_spec in kv_cache_specs)
|
|||
|
|
|
|||
|
|
# if has_kv_cache:
|
|||
|
|
# if os.environ.get("VLLM_ELASTIC_EP_SCALE_UP_LAUNCH") == "1":
|
|||
|
|
# dp_group = getattr(self, "dp_group", None)
|
|||
|
|
# assert dp_group is not None
|
|||
|
|
# self.available_gpu_memory_for_kv_cache = \
|
|||
|
|
# ParallelConfig.sync_kv_cache_memory_size(dp_group, -1)
|
|||
|
|
# available_gpu_memory = [
|
|||
|
|
# self.available_gpu_memory_for_kv_cache
|
|||
|
|
# ] * len(kv_cache_specs)
|
|||
|
|
# else:
|
|||
|
|
# # Profiles the peak memory usage of the model to determine how
|
|||
|
|
# # much memory can be allocated for kv cache.
|
|||
|
|
# available_gpu_memory = (
|
|||
|
|
# self.model_executor.determine_available_memory())
|
|||
|
|
# self.available_gpu_memory_for_kv_cache = \
|
|||
|
|
# available_gpu_memory[0]
|
|||
|
|
# else:
|
|||
|
|
# # Attention free models don't need memory for kv cache
|
|||
|
|
# available_gpu_memory = [0] * len(kv_cache_specs)
|
|||
|
|
|
|||
|
|
memory_blocks = self.model_executor.determine_available_memory_block() # [(memory, blocks) * rank_number]
|
|||
|
|
available_gpu_memory = [memory_block[0] for memory_block in memory_blocks]
|
|||
|
|
num_gpu_blocks = memory_blocks[0][1]
|
|||
|
|
|
|||
|
|
assert len(kv_cache_specs) == len(available_gpu_memory)
|
|||
|
|
|
|||
|
|
kv_cache_configs = get_kv_cache_configs(vllm_config, kv_cache_specs,
|
|||
|
|
available_gpu_memory)
|
|||
|
|
|
|||
|
|
### patch here to support long seq_length for mtp
|
|||
|
|
for kv_cache_config in kv_cache_configs:
|
|||
|
|
for ii in range(len(kv_cache_config.kv_cache_tensors)):
|
|||
|
|
kv_cache_config.kv_cache_tensors[ii].size = kv_cache_config.kv_cache_tensors[ii].size * num_gpu_blocks // kv_cache_config.num_blocks
|
|||
|
|
|
|||
|
|
kv_cache_config.num_blocks = num_gpu_blocks
|
|||
|
|
### patch here to support long seq_length for mtp end
|
|||
|
|
scheduler_kv_cache_config = generate_scheduler_kv_cache_config(
|
|||
|
|
kv_cache_configs)
|
|||
|
|
num_gpu_blocks = scheduler_kv_cache_config.num_blocks
|
|||
|
|
num_cpu_blocks = 0
|
|||
|
|
|
|||
|
|
# Initialize kv cache and warmup the execution
|
|||
|
|
self.model_executor.initialize_from_config(kv_cache_configs)
|
|||
|
|
|
|||
|
|
elapsed = time.time() - start
|
|||
|
|
logger.info(("init engine (profile, create kv cache, "
|
|||
|
|
"warmup model) took %.2f seconds"), elapsed)
|
|||
|
|
return num_gpu_blocks, num_cpu_blocks, scheduler_kv_cache_config
|
|||
|
|
|
|||
|
|
def preprocess_add_request(
|
|||
|
|
self, request: EngineCoreRequest) -> tuple[Request, int]:
|
|||
|
|
"""Preprocess the request.
|
|||
|
|
|
|||
|
|
This function could be directly used in input processing thread to allow
|
|||
|
|
request initialization running in parallel with Model forward
|
|||
|
|
"""
|
|||
|
|
# Note on thread safety: no race condition.
|
|||
|
|
# `mm_receiver_cache` is reset at the end of LLMEngine init,
|
|||
|
|
# and will only be accessed in the input processing thread afterwards.
|
|||
|
|
if self.mm_receiver_cache is not None and request.mm_features:
|
|||
|
|
request.mm_features = (
|
|||
|
|
self.mm_receiver_cache.get_and_update_features(
|
|||
|
|
request.mm_features))
|
|||
|
|
|
|||
|
|
req = Request.from_engine_core_request(request,
|
|||
|
|
self.request_block_hasher)
|
|||
|
|
if req.use_structured_output:
|
|||
|
|
# Note on thread safety: no race condition.
|
|||
|
|
# `grammar_init` is only invoked in input processing thread. For
|
|||
|
|
# `structured_output_manager`, each request is independent and
|
|||
|
|
# grammar compilation is async. Scheduler always checks grammar
|
|||
|
|
# compilation status before scheduling request.
|
|||
|
|
self.structured_output_manager.grammar_init(req)
|
|||
|
|
return req, request.current_wave
|