94 lines
3.1 KiB
Python
94 lines
3.1 KiB
Python
|
|
import enum
|
|
import time
|
|
from collections.abc import Mapping
|
|
from typing import Any, Optional, Union
|
|
|
|
import msgspec
|
|
import torch
|
|
|
|
from vllm.lora.request import LoRARequest
|
|
from vllm.multimodal.inputs import MultiModalFeatureSpec
|
|
from vllm.pooling_params import PoolingParams
|
|
from vllm.sampling_params import SamplingParams
|
|
from vllm.v1.outputs import LogprobsLists, LogprobsTensors
|
|
from vllm.v1.engine import EngineCoreOutput, UtilityOutput
|
|
|
|
from vllm_vacc.vllm.v1.metrics.stats import SchedulerStats
|
|
|
|
# These are possible values of RequestOutput.finish_reason,
|
|
# so form part of the external API.
|
|
FINISH_REASON_STRINGS = ("stop", "length", "abort")
|
|
|
|
class EngineCoreRequest(
|
|
msgspec.Struct,
|
|
array_like=True, # type: ignore[call-arg]
|
|
omit_defaults=True, # type: ignore[call-arg]
|
|
gc=False): # type: ignore[call-arg]
|
|
|
|
request_id: str
|
|
prompt_token_ids: Optional[list[int]]
|
|
mm_features: Optional[list[MultiModalFeatureSpec]]
|
|
sampling_params: Optional[SamplingParams]
|
|
pooling_params: Optional[PoolingParams]
|
|
eos_token_id: Optional[int]
|
|
arrival_time: float
|
|
lora_request: Optional[LoRARequest]
|
|
cache_salt: Optional[str]
|
|
data_parallel_rank: Optional[int]
|
|
prompt_embeds: Optional[torch.Tensor] = None
|
|
deepstack_input_embeds: Optional[torch.Tensor] = None
|
|
# Index of the client, used to ensure outputs are sent back to the same
|
|
# client for this request when scaling out the front-end.
|
|
client_index: int = 0
|
|
|
|
# Used in DP case to indicate which wave of requests this is expected to
|
|
# belong to, to cover a race condition where the request is sent before
|
|
# a wave finished notification is received.
|
|
current_wave: int = 0
|
|
priority: int = 0
|
|
|
|
trace_headers: Optional[Mapping[str, str]] = None
|
|
|
|
class EngineCoreOutputs(
|
|
msgspec.Struct,
|
|
array_like=True, # type: ignore[call-arg]
|
|
omit_defaults=True, # type: ignore[call-arg]
|
|
gc=False): # type: ignore[call-arg]
|
|
|
|
# NOTE(Nick): We could consider ways to make this more compact,
|
|
# e.g. columnwise layout
|
|
|
|
engine_index: int = 0
|
|
|
|
# [num_reqs]
|
|
outputs: list[EngineCoreOutput] = []
|
|
scheduler_stats: Optional[SchedulerStats] = None
|
|
timestamp: float = 0.0
|
|
|
|
utility_output: Optional[UtilityOutput] = None
|
|
finished_requests: Optional[set[str]] = None
|
|
|
|
# In DP case, used to signal that the current wave of requests
|
|
# has finished and the engines are paused.
|
|
wave_complete: Optional[int] = None
|
|
# In DP case, used to signal that a request was received for an
|
|
# "old" wave, so the next wave needs to be started in other engines.
|
|
start_wave: Optional[int] = None
|
|
|
|
def __post_init__(self):
|
|
if self.timestamp == 0.0:
|
|
self.timestamp = time.monotonic()
|
|
|
|
class EngineCoreRequestType(enum.Enum):
|
|
"""
|
|
Request types defined as hex byte strings, so it can be sent over sockets
|
|
without separate encoding step.
|
|
"""
|
|
ADD = b'\x00'
|
|
ABORT = b'\x01'
|
|
START_DP_WAVE = b'\x02'
|
|
UTILITY = b'\x03'
|
|
# Sentinel used within EngineCoreProc.
|
|
EXECUTOR_FAILED = b'\x04'
|
|
ADD_BULK = b'\x05' |