Upgrade to vllm 0.17.0 corex v4.1 overlay

This commit is contained in:
2026-04-29 19:38:22 +08:00
parent 8fac6062e4
commit 938d0854a5
430 changed files with 35969 additions and 14511 deletions

View File

@@ -27,12 +27,21 @@ PauseMode = Literal["abort", "wait", "keep"]
# These are possible values of RequestOutput.finish_reason,
# so form part of the external API.
FINISH_REASON_STRINGS = ("stop", "length", "abort", "error")
FINISH_REASON_STRINGS = ("stop", "length", "abort", "error", "repetition")
EEP_NOTIFICATION_CALL_ID = -1
class EEPNotificationType(enum.Enum):
NEW_CORE_ENGINES_INIT_READY = "NEW_CORE_ENGINES_INIT_READY"
NEW_CORE_ENGINES_WEIGHTS_INIT_READY = "NEW_CORE_ENGINES_WEIGHTS_INIT_READY"
RECONFIGURE_FINISHED = "RECONFIGURE_FINISHED"
SHUTDOWN_COMPLETE = "SHUTDOWN_COMPLETE"
class FinishReason(enum.IntEnum):
"""
Reason a request finished - stop, length, abort, or error.
Reason a request finished - stop, length, abort, error, or repetition.
Int rather than Str for more compact serialization.
@@ -41,6 +50,7 @@ class FinishReason(enum.IntEnum):
abort - aborted by client
error - retryable request-level internal error (e.g., KV load failure).
Invariant: always converted to 500 Internal Server Error.
repetition - repetitive token pattern detected (hallucination)
"""
@@ -48,6 +58,7 @@ class FinishReason(enum.IntEnum):
LENGTH = 1
ABORT = 2
ERROR = 3
REPETITION = 4
def __str__(self):
return FINISH_REASON_STRINGS[self.value]
@@ -235,6 +246,11 @@ class ReconfigureDistributedRequest(msgspec.Struct):
new_data_parallel_rank_local: int
new_data_parallel_master_ip: str
new_data_parallel_master_port: int
new_data_parallel_master_port_list: list[int]
new_stateless_world_group_port_list: list[list[int]]
new_stateless_dp_group_port_list: list[list[int]]
new_stateless_ep_group_port_list: list[list[int]]
new_stateless_eplb_group_port_list: list[list[int]]
class ReconfigureRankType(enum.IntEnum):

View File

@@ -20,6 +20,7 @@ from vllm.distributed.weight_transfer.base import (
)
from vllm.engine.arg_utils import AsyncEngineArgs
from vllm.engine.protocol import EngineClient, StreamingInput
from vllm.entrypoints.serve.elastic_ep.middleware import set_scaling_elastic_ep
from vllm.inputs import ProcessorInputs, PromptType
from vllm.logger import init_logger
from vllm.lora.request import LoRARequest
@@ -134,6 +135,7 @@ class AsyncLLM(EngineClient):
self.renderer = renderer = renderer_from_config(self.vllm_config)
self.io_processor = get_io_processor(
self.vllm_config,
self.renderer,
self.model_config.io_processor_plugin,
)
@@ -647,7 +649,11 @@ class AsyncLLM(EngineClient):
engine_core = self.engine_core
output_processor = self.output_processor
log_stats = self.log_stats
logger_manager = self.logger_manager
# We use a mutable list for logger_manager so that it can be updated
# during elastic EP scaling (see scale_elastic_ep) without creating
# a circular reference via self.
self._logger_ref = [self.logger_manager]
logger_ref = self._logger_ref
renderer = self.renderer
chunk_size = envs.VLLM_V1_OUTPUT_PROC_CHUNK_SIZE
@@ -691,8 +697,8 @@ class AsyncLLM(EngineClient):
# 4) Logging.
# TODO(rob): make into a coroutine and launch it in
# background thread once Prometheus overhead is non-trivial.
if logger_manager:
logger_manager.record(
if logger_ref[0]:
logger_ref[0].record(
engine_idx=outputs.engine_index,
scheduler_stats=outputs.scheduler_stats,
iteration_stats=iteration_stats,
@@ -976,17 +982,13 @@ class AsyncLLM(EngineClient):
new_data_parallel_size,
)
return
logger.info(
"Waiting for requests to drain before scaling up to %s engines...",
new_data_parallel_size,
)
await self.wait_for_requests_to_drain(drain_timeout)
logger.info(
"Requests have been drained, proceeding with scale to %s engines",
new_data_parallel_size,
)
await self.engine_core.scale_elastic_ep(new_data_parallel_size)
self.vllm_config.parallel_config.data_parallel_size = new_data_parallel_size
if envs.VLLM_ELASTIC_EP_DRAIN_REQUESTS:
logger.info(
"VLLM_ELASTIC_EP_DRAIN_REQUESTS is set, "
"waiting for requests to drain before scaling"
)
await self.wait_for_requests_to_drain(drain_timeout)
# recreate stat loggers
if new_data_parallel_size > old_data_parallel_size and self.log_stats:
@@ -999,6 +1001,18 @@ class AsyncLLM(EngineClient):
engine_idxs=list(range(new_data_parallel_size)),
custom_stat_loggers=None,
)
# Update the mutable ref so output_handler picks up the
# new logger without creating a circular reference via self.
if hasattr(self, "_logger_ref"):
self._logger_ref[0] = self.logger_manager
self.logger_manager.log_engine_initialized()
set_scaling_elastic_ep(True)
try:
await self.engine_core.scale_elastic_ep(new_data_parallel_size)
self.vllm_config.parallel_config.data_parallel_size = new_data_parallel_size
finally:
set_scaling_elastic_ep(False)
@property
def is_running(self) -> bool:

View File

@@ -71,6 +71,9 @@ class DPCoordinator:
)
local_only_eng = dp_size == parallel_config.data_parallel_size_local
# NOTE(yongji): handling scaling from intra-node to inter-node
if parallel_config.enable_elastic_ep:
local_only_eng = False
back_publish_address = get_engine_client_zmq_addr(local_only_eng, host)
back_output_address = get_engine_client_zmq_addr(local_only_eng, host)
@@ -201,6 +204,7 @@ class DPCoordinatorProc:
poller = zmq.Poller()
poller.register(publish_front, zmq.POLLIN)
poller.register(publish_back, zmq.POLLIN)
poller.register(output_back, zmq.POLLIN)
last_publish_time = 0
while True:
@@ -231,6 +235,22 @@ class DPCoordinatorProc:
events = dict(events)
wave_state_changed = False
if publish_back in events:
buffer = publish_back.recv()
if buffer == b"\x01":
# NOTE(yongji): newly started engine subscribed
# We need to send READY message here instead of receiving
# SCALE_ELASTIC_EP notification from engine core client
# as SCALE_ELASTIC_EP is only sent when
# new engines finished initialization.
# Subscription message, on the other hand, is sent
# by each engine during initialization
publish_back.send(b"READY")
else:
logger.error(
"DP Coordinator receives unexpected message from engines"
)
if publish_front in events:
buffer = publish_front.recv()
if buffer in (b"\x01", b"\x00"):
@@ -259,7 +279,6 @@ class DPCoordinatorProc:
# current_wave
# we note that 0 is the wave number for the new
# engine
engines_running = False
logger.info(
"DPCoordinator scaled up from %s to %s engines",
current_count,

View File

@@ -17,6 +17,7 @@ from typing import Any, TypeVar, cast
import msgspec
import zmq
import vllm.envs as envs
from vllm.config import ParallelConfig, VllmConfig
from vllm.distributed import stateless_destroy_torch_distributed_process_group
from vllm.envs import enable_envs_cache
@@ -44,6 +45,8 @@ from vllm.v1.core.kv_cache_utils import (
from vllm.v1.core.sched.interface import PauseState, SchedulerInterface
from vllm.v1.core.sched.output import SchedulerOutput
from vllm.v1.engine import (
EEP_NOTIFICATION_CALL_ID,
EEPNotificationType,
EngineCoreOutput,
EngineCoreOutputs,
EngineCoreRequest,
@@ -72,7 +75,6 @@ from vllm.version import __version__ as VLLM_VERSION
logger = init_logger(__name__)
POLLING_TIMEOUT_S = 2.5
HANDSHAKE_TIMEOUT_MINS = 5
_R = TypeVar("_R") # Return type for collective_rpc
@@ -111,6 +113,9 @@ class EngineCore:
self.available_gpu_memory_for_kv_cache = -1
if envs.VLLM_ELASTIC_EP_SCALE_UP_LAUNCH:
self._eep_scale_up_before_kv_init()
# Setup KV Caches and update CacheConfig after profiling.
num_gpu_blocks, num_cpu_blocks, kv_cache_config = self._initialize_kv_caches(
vllm_config
@@ -180,13 +185,55 @@ class EngineCore:
# Batch queue for scheduled batches. This enables us to asynchronously
# schedule and execute batches, and is required by pipeline parallelism
# to eliminate pipeline bubbles.
self.batch_queue_size = self.model_executor.max_concurrent_batches
base_batch_queue_size = self.model_executor.max_concurrent_batches
if envs.VLLM_ENABLE_PP_ILU_OPT:
self.batch_queue_size = envs.VLLM_PP_ILU_OPT_BATCH_QUEUE_SIZE
if self.batch_queue_size <= 0:
self.batch_queue_size = base_batch_queue_size * 2
self._use_batch_queue_ilu_opt = True
logger.info(
"PP ILU opt is enabled: batch_queue_size=%d (base=%d)",
self.batch_queue_size,
base_batch_queue_size,
)
else:
self.batch_queue_size = base_batch_queue_size
self._use_batch_queue_ilu_opt = False
self.batch_queue: (
deque[tuple[Future[ModelRunnerOutput], SchedulerOutput, Future[Any]]] | None
) = None
if self.batch_queue_size > 1:
logger.debug("Batch queue is enabled with size %d", self.batch_queue_size)
logger.info(
"Batch queue is enabled with size %d (ilu_opt=%s)",
self.batch_queue_size,
self._use_batch_queue_ilu_opt,
)
self.batch_queue = deque(maxlen=self.batch_queue_size)
if self._use_batch_queue_ilu_opt:
self.engine_core_input_queue: queue.Queue[
tuple[Future[ModelRunnerOutput], SchedulerOutput]
] = queue.Queue(maxsize=self.batch_queue_size)
self.engine_core_output_queue: queue.Queue[
tuple[SchedulerOutput, ModelRunnerOutput, bool]
] = queue.Queue(maxsize=self.batch_queue_size)
self._batch_queue_loop_thread = threading.Thread(
target=self._process_batch_queue_loop,
daemon=True,
)
self._batch_queue_loop_thread.start()
# When PP mix ILU scheduling or PP ILU opt is enabled with a KV
# connector, only NixlConnector is supported.
if vllm_config.kv_transfer_config is not None and (
envs.VLLM_ENABLE_PP_MIX_ILU_SCHEDULING or envs.VLLM_ENABLE_PP_ILU_OPT
):
kv_connector_name = vllm_config.kv_transfer_config.kv_connector
if kv_connector_name != "NixlConnector":
raise ValueError(
"When VLLM_ENABLE_PP_MIX_ILU_SCHEDULING or VLLM_ENABLE_PP_ILU_OPT "
"is enabled with a KV connector, only NixlConnector is supported; "
f"current kv_connector is {kv_connector_name!r}."
)
self.is_ec_producer = (
vllm_config.ec_transfer_config is not None
@@ -209,6 +256,10 @@ class EngineCore:
self.step if self.batch_queue is None else self.step_with_batch_queue
)
self.async_scheduling = vllm_config.scheduler_config.async_scheduling
self.draft_in_model_output = (
self.batch_queue is not None and self.use_spec_decode
)
self.aborts_queue = queue.Queue[list[str]]()
@@ -234,12 +285,10 @@ class EngineCore:
has_kv_cache = any(kv_cache_spec for kv_cache_spec in kv_cache_specs)
if has_kv_cache:
if os.environ.get("VLLM_ELASTIC_EP_SCALE_UP_LAUNCH") == "1":
dp_group = getattr(self, "dp_group", None)
assert dp_group is not None
self.available_gpu_memory_for_kv_cache = (
ParallelConfig.sync_kv_cache_memory_size(dp_group, -1)
)
if envs.VLLM_ELASTIC_EP_SCALE_UP_LAUNCH:
# NOTE(yongji): should already be set
# during _eep_scale_up_before_kv_init
assert self.available_gpu_memory_for_kv_cache > 0
available_gpu_memory = [self.available_gpu_memory_for_kv_cache] * len(
kv_cache_specs
)
@@ -408,12 +457,52 @@ class EngineCore:
# When using async scheduling we can't get draft token ids in advance,
# so we update draft token ids in the worker process and don't
# need to update draft token ids here.
if self.draft_in_model_output:
return
if not self.async_scheduling and self.use_spec_decode and model_executed:
# Take the draft token ids.
draft_token_ids = self.model_executor.take_draft_token_ids()
if draft_token_ids is not None:
self.scheduler.update_draft_token_ids(draft_token_ids)
def _has_kv_connector_work(self, meta: Any) -> bool:
"""Return True if kv_connector_metadata has any recv/save/send work."""
if meta is None:
return False
for attr in ("reqs_to_recv", "reqs_to_save", "reqs_to_send"):
val = getattr(meta, attr, None)
if val is not None and len(val) > 0:
return True
return False
def _has_meaningful_scheduler_output(
self, scheduler_output: SchedulerOutput
) -> bool:
"""Return False if scheduler_output is effectively empty."""
return not (
len(scheduler_output.scheduled_new_reqs) == 0
and len(scheduler_output.scheduled_cached_reqs.req_ids) == 0
and len(scheduler_output.num_scheduled_tokens) == 0
and scheduler_output.total_num_scheduled_tokens == 0
and len(scheduler_output.scheduled_spec_decode_tokens) == 0
and len(scheduler_output.scheduled_encoder_inputs) == 0
and len(scheduler_output.finished_req_ids) == 0
and (scheduler_output.scheduled_resumed_reqs is None
or len(scheduler_output.scheduled_resumed_reqs) == 0)
and not self._has_kv_connector_work(
scheduler_output.kv_connector_metadata
)
)
def _process_batch_queue_loop(self) -> None:
while True:
future, scheduler_output = self.engine_core_input_queue.get()
with self.log_error_detail(scheduler_output):
model_output = future.result()
self.engine_core_output_queue.put(
(scheduler_output, model_output, False)
)
def step_with_batch_queue(
self,
) -> tuple[dict[int, EngineCoreOutputs] | None, bool]:
@@ -434,6 +523,9 @@ class EngineCore:
batch_queue = self.batch_queue
assert batch_queue is not None
if self._use_batch_queue_ilu_opt:
return self.step_with_batch_queue_ilu_opt()
# Try to schedule a new batch if the batch queue is not full, but
# the scheduler may return an empty batch if all requests are scheduled.
# Note that this is not blocking.
@@ -531,6 +623,96 @@ class EngineCore:
return engine_core_outputs, model_executed
def step_with_batch_queue_ilu_opt(
self,
) -> tuple[dict[int, EngineCoreOutputs] | None, bool]:
"""Async batch queue variant using background thread for PP ILU opt.
Uses engine_core_input_queue / engine_core_output_queue with a
background thread (_process_batch_queue_loop) that blocks on
future.result(), so the main thread never blocks on GPU compute.
"""
assert not self.is_ec_producer, (
"ec_producer is not supported in step_with_batch_queue_ilu_opt"
)
assert not self.is_pooling_model, (
"is_pooling_model is not supported in step_with_batch_queue_ilu_opt"
)
assert not self.async_scheduling, (
"async_scheduling is not supported in step_with_batch_queue_ilu_opt"
)
model_executed = False
if self.scheduler.has_requests():
scheduler_output = self.scheduler.schedule()
has_meaningful_schedule = self._has_meaningful_scheduler_output(
scheduler_output
)
if (
self.engine_core_input_queue.qsize() <= 1
and not has_meaningful_schedule
):
has_meaningful_schedule = True
if has_meaningful_schedule:
logger.debug(
"[step_with_batch_queue_ilu_opt] scheduler_output: "
"total_num_scheduled_tokens=%s num_scheduled_tokens=%s "
"scheduled_new_reqs=%s scheduled_cached_reqs.req_ids=%s "
"resumed_req_ids=%s finished_req_ids=%s "
"has_meaningful_schedule=%s",
scheduler_output.total_num_scheduled_tokens,
scheduler_output.num_scheduled_tokens,
[r.req_id for r in scheduler_output.scheduled_new_reqs],
scheduler_output.scheduled_cached_reqs.req_ids,
scheduler_output.scheduled_cached_reqs.resumed_req_ids,
scheduler_output.finished_req_ids,
has_meaningful_schedule,
)
if has_meaningful_schedule:
exec_future = self.model_executor.execute_model(
scheduler_output, non_block=True
)
model_executed = (
scheduler_output.total_num_scheduled_tokens > 0
)
if not model_executed:
future = cast(Future[ModelRunnerOutput], exec_future)
else:
grammar_output = self.scheduler.get_grammar_bitmask(
scheduler_output
)
future = self.model_executor.sample_tokens(
grammar_output, non_block=True
)
if self.engine_core_input_queue.full():
scheduler_output_out, model_output_out, model_executed_out = (
self.engine_core_output_queue.get()
)
engine_core_outputs = self.scheduler.update_from_output(
scheduler_output_out, model_output_out
)
self.engine_core_input_queue.put(
(future, scheduler_output)
)
return engine_core_outputs, model_executed_out
self.engine_core_input_queue.put((future, scheduler_output))
try:
scheduler_output, model_output, model_executed = (
self.engine_core_output_queue.get_nowait()
)
engine_core_outputs = self.scheduler.update_from_output(
scheduler_output, model_output
)
return engine_core_outputs, model_executed
except queue.Empty:
return None, False
def _process_aborts_queue(self):
if not self.aborts_queue.empty():
request_ids = []
@@ -753,11 +935,22 @@ class EngineCore:
self.structured_output_manager.grammar_init(req)
return req, request.current_wave
def _eep_scale_up_before_kv_init(self):
raise NotImplementedError
def _eep_send_engine_core_notification(
self,
notification_type: EEPNotificationType,
vllm_config: VllmConfig | None = None,
):
raise NotImplementedError
class EngineCoreProc(EngineCore):
"""ZMQ-wrapper for running EngineCore in background process."""
ENGINE_CORE_DEAD = b"ENGINE_CORE_DEAD"
addresses: EngineZmqAddresses
@instrument(span_name="EngineCoreProc init")
def __init__(
@@ -808,6 +1001,13 @@ class EngineCoreProc(EngineCore):
# and "hybrid" LB modes.
self.publish_dp_lb_stats = internal_dp_balancing
self.addresses = addresses
self.process_input_queue_block = True
if envs.VLLM_ELASTIC_EP_SCALE_UP_LAUNCH:
self._eep_send_engine_core_notification(
EEPNotificationType.NEW_CORE_ENGINES_INIT_READY,
vllm_config=vllm_config,
)
self._init_data_parallel(vllm_config)
super().__init__(
@@ -1120,8 +1320,14 @@ class EngineCoreProc(EngineCore):
if logger.isEnabledFor(DEBUG):
logger.debug("EngineCore waiting for work.")
waited = True
req = self.input_queue.get()
self._handle_client_request(*req)
block = self.process_input_queue_block
try:
req = self.input_queue.get(block=block)
self._handle_client_request(*req)
except queue.Empty:
break
if not block:
break
if waited:
logger.debug("EngineCore loop active.")
@@ -1291,6 +1497,11 @@ class EngineCoreProc(EngineCore):
for input_socket, _ in poller.poll():
# (RequestType, RequestData)
type_frame, *data_frames = input_socket.recv_multipart(copy=False)
# NOTE(yongji): ignore READY message sent by DP coordinator
# that is used to notify newly started engines
if type_frame.buffer == b"READY":
assert input_socket == coord_socket
continue
request_type = EngineCoreRequestType(bytes(type_frame.buffer))
# Deserialize the request data.
@@ -1489,6 +1700,10 @@ class DPEngineCoreProc(EngineCoreProc):
self.current_wave = 0
self.last_counts = (0, 0)
from vllm.distributed.elastic_ep.elastic_state import ElasticEPScalingState
self.eep_scaling_state: ElasticEPScalingState | None = None
# Initialize the engine.
dp_rank = vllm_config.parallel_config.data_parallel_rank
super().__init__(
@@ -1512,7 +1727,9 @@ class DPEngineCoreProc(EngineCoreProc):
assert 0 <= local_dp_rank <= dp_rank < dp_size
self.dp_rank = dp_rank
self.dp_group = vllm_config.parallel_config.stateless_init_dp_group()
self.dp_group, self.dp_store = (
vllm_config.parallel_config.stateless_init_dp_group(return_store=True)
)
def shutdown(self):
super().shutdown()
@@ -1533,7 +1750,11 @@ class DPEngineCoreProc(EngineCoreProc):
def resume_scheduler(self):
super().resume_scheduler()
if not self.engines_running and self.scheduler.has_unfinished_requests():
if (
self.has_coordinator
and not self.engines_running
and self.scheduler.has_unfinished_requests()
):
# Wake up other DP engines.
self.output_queue.put_nowait(
(-1, EngineCoreOutputs(start_wave=self.current_wave))
@@ -1575,7 +1796,12 @@ class DPEngineCoreProc(EngineCoreProc):
# 1) Poll the input queue until there is work to do.
self._process_input_queue()
# 2) Step the engine core.
if self.eep_scaling_state is not None:
_ = self.eep_scaling_state.progress()
if self.eep_scaling_state.is_complete():
self.process_input_queue_block = True
self.eep_scaling_state = None
executed = self._process_engine_step()
self._maybe_publish_request_counts()
@@ -1625,54 +1851,129 @@ class DPEngineCoreProc(EngineCoreProc):
def reinitialize_distributed(
self, reconfig_request: ReconfigureDistributedRequest
) -> None:
stateless_destroy_torch_distributed_process_group(self.dp_group)
self.shutdown()
from copy import deepcopy
parallel_config = self.vllm_config.parallel_config
old_dp_size = parallel_config.data_parallel_size
parallel_config.data_parallel_size = reconfig_request.new_data_parallel_size
if reconfig_request.new_data_parallel_rank != -1:
parallel_config.data_parallel_rank = reconfig_request.new_data_parallel_rank
# local rank specifies device visibility, it should not be changed
assert (
reconfig_request.new_data_parallel_rank_local
== ReconfigureRankType.KEEP_CURRENT_RANK
)
parallel_config.data_parallel_master_ip = (
reconfig_request.new_data_parallel_master_ip
)
parallel_config.data_parallel_master_port = (
reconfig_request.new_data_parallel_master_port
)
if reconfig_request.new_data_parallel_rank != -2:
self.dp_rank = parallel_config.data_parallel_rank
self.dp_group = parallel_config.stateless_init_dp_group()
reconfig_request.new_data_parallel_master_port = (
parallel_config.data_parallel_master_port
)
from vllm.distributed.elastic_ep.elastic_state import ElasticEPScalingState
self.model_executor.reinitialize_distributed(reconfig_request)
if reconfig_request.new_data_parallel_size > old_dp_size:
assert self.available_gpu_memory_for_kv_cache > 0
# pass available_gpu_memory_for_kv_cache from existing
# engine-cores to new engine-cores so they can directly
# use it in _initialize_kv_caches() rather than profiling.
ParallelConfig.sync_kv_cache_memory_size(
self.dp_group, self.available_gpu_memory_for_kv_cache
)
# NOTE(yongji): newly joined workers require dummy_run even
# CUDA graph is not used
self.model_executor.collective_rpc("compile_or_warm_up_model")
new_parallel_config = deepcopy(self.vllm_config.parallel_config)
old_dp_size = new_parallel_config.data_parallel_size
new_parallel_config.data_parallel_size = reconfig_request.new_data_parallel_size
if (
reconfig_request.new_data_parallel_rank
== ReconfigureRankType.SHUTDOWN_CURRENT_RANK
!= ReconfigureRankType.KEEP_CURRENT_RANK
):
self.shutdown()
logger.info("DPEngineCoreProc %s shutdown", self.dp_rank)
else:
logger.info(
"Distributed environment reinitialized for DP rank %s", self.dp_rank
new_parallel_config.data_parallel_rank = (
reconfig_request.new_data_parallel_rank
)
new_parallel_config.data_parallel_master_ip = (
reconfig_request.new_data_parallel_master_ip
)
new_parallel_config.data_parallel_master_port = (
reconfig_request.new_data_parallel_master_port
)
new_parallel_config._data_parallel_master_port_list = (
reconfig_request.new_data_parallel_master_port_list
)
is_scale_down = reconfig_request.new_data_parallel_size < old_dp_size
is_shutdown = (
reconfig_request.new_data_parallel_rank
== ReconfigureRankType.SHUTDOWN_CURRENT_RANK
)
self.eep_scaling_state = ElasticEPScalingState(
model_executor=self.model_executor,
engine_core=self,
vllm_config=self.vllm_config,
new_parallel_config=new_parallel_config,
worker_type="removing" if is_shutdown else "existing",
scale_type="scale_down" if is_scale_down else "scale_up",
reconfig_request=reconfig_request,
)
self.process_input_queue_block = False
logger.info(
"[Elastic EP] Received reconfiguration request and starting scaling up/down"
)
def _eep_send_engine_core_notification(
self,
notification_type: EEPNotificationType,
vllm_config: VllmConfig | None = None,
):
"""
Send notifications to EngineCoreClient, which can then forward
the notifications to other engine core processes. It is used for:
1) In scale up: new core engines to notify exisiting core engines
that they are ready;
2) In scale down: removing core engines to notify EngineCoreClient
so EngineCoreClient can release their ray placement groups;
3) Both scale up/down: to notify EngineCoreClient that exisiting
core engines have already switched to the new parallel setup.
"""
if vllm_config is None:
dp_rank = self.vllm_config.parallel_config.data_parallel_rank
else:
dp_rank = vllm_config.parallel_config.data_parallel_rank
notification_data = (notification_type.value, dp_rank)
outputs = EngineCoreOutputs(
utility_output=UtilityOutput(
call_id=EEP_NOTIFICATION_CALL_ID,
result=UtilityResult(notification_data),
)
)
outputs.engine_index = self.engine_index
if hasattr(self, "output_thread") and self.output_thread.is_alive():
self.output_queue.put_nowait((0, outputs))
else:
encoder = MsgpackEncoder()
with (
zmq.Context() as ctx,
make_zmq_socket(
ctx, self.addresses.outputs[0], zmq.PUSH, linger=4000
) as socket,
):
socket.send_multipart(encoder.encode(outputs))
def eep_handle_engine_core_notification(
self, notification_type: str | EEPNotificationType
):
"""
Handle notification received from EngineCoreClient
(forwarded from new core engines).
"""
assert self.eep_scaling_state is not None
if isinstance(notification_type, str):
notification_type = EEPNotificationType(notification_type)
self.eep_scaling_state.handle_notification(notification_type)
def _eep_scale_up_before_kv_init(self):
from vllm.distributed.elastic_ep.elastic_state import ElasticEPScalingState
self.eep_scaling_state = ElasticEPScalingState(
model_executor=self.model_executor,
engine_core=self,
vllm_config=self.vllm_config,
new_parallel_config=self.vllm_config.parallel_config,
worker_type="new",
scale_type="scale_up",
reconfig_request=None,
)
self.model_executor.collective_rpc("init_device")
self.model_executor.collective_rpc("load_model")
self._eep_send_engine_core_notification(
EEPNotificationType.NEW_CORE_ENGINES_WEIGHTS_INIT_READY
)
self.model_executor.collective_rpc(
"elastic_ep_execute", args=("receive_weights",)
)
self.available_gpu_memory_for_kv_cache = (
ParallelConfig.sync_kv_cache_memory_size(self.dp_group, -1)
)
self.model_executor.collective_rpc(
"elastic_ep_execute", args=("prepare_new_worker",)
)
self.process_input_queue_block = False
class EngineCoreActorMixin:

View File

@@ -28,11 +28,12 @@ from vllm.tracing import instrument
from vllm.utils.async_utils import in_loop
from vllm.utils.network_utils import (
close_sockets,
get_open_port,
get_open_zmq_inproc_path,
make_zmq_socket,
)
from vllm.v1.engine import (
EEP_NOTIFICATION_CALL_ID,
EEPNotificationType,
EngineCoreOutputs,
EngineCoreRequest,
EngineCoreRequestType,
@@ -47,6 +48,7 @@ from vllm.v1.engine.exceptions import EngineDeadError
from vllm.v1.engine.utils import (
CoreEngineActorManager,
CoreEngineProcManager,
get_engine_zmq_addresses,
launch_core_engines,
)
from vllm.v1.executor import Executor
@@ -445,6 +447,63 @@ class BackgroundResources:
raise EngineDeadError()
@dataclass
class ElasticScalingCache:
existing_core_engines: list[EngineIdentity]
num_new_core_engines: int
pending_notifications: dict[EEPNotificationType, set[int]]
def allocate_stateless_group_ports(parallel_config, new_data_parallel_size: int):
"""
Allocate stateless group ports for elastic EP.
"""
from vllm.utils.network_utils import get_open_ports_list
assert parallel_config.enable_elastic_ep, "Elastic EP must be enabled"
world_size = parallel_config.world_size
new_world_size_across_dp = world_size * new_data_parallel_size
num_world_groups = 1
num_dp_groups = max(1, new_world_size_across_dp // new_data_parallel_size)
num_ep_groups = max(
1,
new_world_size_across_dp
// (new_data_parallel_size * parallel_config.tensor_parallel_size),
)
num_eplb_groups = num_ep_groups
total_ports_needed = (
num_world_groups + num_dp_groups + num_ep_groups + num_eplb_groups
) * 3 + 5
all_ports = get_open_ports_list(total_ports_needed)
new_data_parallel_master_port_list = all_ports[-5:]
all_ports = all_ports[:-5]
new_stateless_world_group_port_list = [
all_ports[i : i + 3] for i in range(0, num_world_groups * 3, 3)
]
start_idx = num_world_groups * 3
new_stateless_dp_group_port_list = [
all_ports[i : i + 3] for i in range(start_idx, start_idx + num_dp_groups * 3, 3)
]
start_idx += num_dp_groups * 3
new_stateless_ep_group_port_list = [
all_ports[i : i + 3] for i in range(start_idx, start_idx + num_ep_groups * 3, 3)
]
start_idx += num_ep_groups * 3
new_stateless_eplb_group_port_list = [
all_ports[i : i + 3]
for i in range(start_idx, start_idx + num_eplb_groups * 3, 3)
]
parallel_config._stateless_world_group_port_list = (
new_stateless_world_group_port_list
)
parallel_config._stateless_dp_group_port_list = new_stateless_dp_group_port_list
parallel_config._stateless_ep_group_port_list = new_stateless_ep_group_port_list
parallel_config._stateless_eplb_group_port_list = new_stateless_eplb_group_port_list
parallel_config.data_parallel_master_port = new_data_parallel_master_port_list.pop()
parallel_config._data_parallel_master_port_list = new_data_parallel_master_port_list
class MPClient(EngineCoreClient):
"""
MPClient: base client for multi-proc EngineCore.
@@ -491,32 +550,37 @@ class MPClient(EngineCoreClient):
input_address = client_addresses["input_address"]
output_address = client_addresses["output_address"]
self.stats_update_address = client_addresses.get("stats_update_address")
self.input_socket = self.resources.input_socket = make_zmq_socket(
self.ctx, input_address, zmq.ROUTER, bind=True
)
self.resources.output_socket = make_zmq_socket(
self.ctx, output_address, zmq.PULL
)
else:
# Engines are managed by this client.
with launch_core_engines(vllm_config, executor_class, log_stats) as (
engine_manager,
coordinator,
addresses = get_engine_zmq_addresses(vllm_config)
self.input_socket = self.resources.input_socket = make_zmq_socket(
self.ctx, addresses.inputs[0], zmq.ROUTER, bind=True
)
self.resources.output_socket = make_zmq_socket(
self.ctx, addresses.outputs[0], zmq.PULL
)
with launch_core_engines(
vllm_config,
executor_class,
log_stats,
addresses,
):
) as (engine_manager, coordinator, addresses):
self.resources.coordinator = coordinator
self.resources.engine_manager = engine_manager
(input_address,) = addresses.inputs
(output_address,) = addresses.outputs
self.stats_update_address = addresses.frontend_stats_publish_address
if coordinator is not None:
assert self.stats_update_address == (
coordinator.get_stats_publish_address()
)
# Create input and output sockets.
self.input_socket = self.resources.input_socket = make_zmq_socket(
self.ctx, input_address, zmq.ROUTER, bind=True
)
self.resources.output_socket = make_zmq_socket(
self.ctx, output_address, zmq.PULL
)
parallel_config = vllm_config.parallel_config
dp_size = parallel_config.data_parallel_size
dp_rank = parallel_config.data_parallel_index
@@ -545,8 +609,13 @@ class MPClient(EngineCoreClient):
timeout=VLLM_ENGINE_READY_TIMEOUT_S * 1000 # convert to ms
):
raise TimeoutError(
"Timed out waiting for engines to send "
"initial message on input socket."
f"Timed out waiting for engine core processes to "
f"start. This is often caused by slow weight loading "
f"for large models. Waited "
f"{VLLM_ENGINE_READY_TIMEOUT_S}s (configured by "
f"VLLM_ENGINE_READY_TIMEOUT_S). To increase the "
f"timeout, set the environment variable: "
f"VLLM_ENGINE_READY_TIMEOUT_S=<seconds>"
)
identity, _ = sync_input_socket.recv_multipart()
identities.remove(identity)
@@ -877,6 +946,10 @@ class AsyncMPClient(MPClient):
output_socket = resources.output_socket
assert output_socket is not None
notification_callback_handler: (
Callable[[AsyncMPClient, Sequence[Any]], Any] | None
) = getattr(self.__class__, "eep_process_engine_core_notification", None)
async def process_outputs_socket():
try:
while True:
@@ -884,7 +957,26 @@ class AsyncMPClient(MPClient):
resources.validate_alive(frames)
outputs: EngineCoreOutputs = decoder.decode(frames)
if outputs.utility_output:
_process_utility_output(outputs.utility_output, utility_results)
if (
outputs.utility_output.call_id == EEP_NOTIFICATION_CALL_ID
and notification_callback_handler is not None
):
assert _self_ref is not None
_self = _self_ref()
if not _self:
return
if outputs.utility_output.result is None:
continue
notification_data = outputs.utility_output.result.result
assert isinstance(notification_data, Sequence)
assert len(notification_data) == 2
asyncio.create_task(
notification_callback_handler(_self, notification_data)
)
else:
_process_utility_output(
outputs.utility_output, utility_results
)
continue
if output_handler is not None:
@@ -1081,6 +1173,8 @@ class DPAsyncMPClient(AsyncMPClient):
# Used only by DPLBAsyncMPClient subclass.
self.lb_engines: list[list[int]] = [[0, 0] for _ in self.core_engines]
self.eep_scaling_cache: ElasticScalingCache | None = None
self.first_req_sock_addr = get_open_zmq_inproc_path()
self.first_req_send_socket = self.resources.first_req_send_socket = (
make_zmq_socket(self.ctx, self.first_req_sock_addr, zmq.PAIR, bind=True)
@@ -1101,12 +1195,6 @@ class DPAsyncMPClient(AsyncMPClient):
assert self.stats_update_address is not None
stats_addr: str = self.stats_update_address
assert len(self.engine_ranks_managed) > 0
# NOTE: running and waiting counts are all global from
# the Coordinator include all global EngineCores. This
# slice includes just the cores managed by this client.
count_slice = slice(
self.engine_ranks_managed[0], self.engine_ranks_managed[-1] + 1
)
async def run_engine_stats_update_task():
with (
@@ -1145,6 +1233,29 @@ class DPAsyncMPClient(AsyncMPClient):
):
# Extract new engine count from the decoded message
new_engine_count = decoded[1]
# Update engine_ranks_managed and count_slice
parallel_config = self.vllm_config.parallel_config
dp_size = parallel_config.data_parallel_size
dp_rank = parallel_config.data_parallel_rank
assert dp_rank == 0
assert dp_size == new_engine_count
assert not (
parallel_config.data_parallel_hybrid_lb
or parallel_config.data_parallel_external_lb
)
num_ranks = dp_size
self.engine_ranks_managed = list(
range(dp_rank, dp_rank + num_ranks)
)
if len(self.lb_engines) < new_engine_count:
self.lb_engines = self.lb_engines + [
[0, 0]
for _ in range(
new_engine_count - len(self.lb_engines)
)
]
else:
self.lb_engines = self.lb_engines[:new_engine_count]
# Send scale up notification to coordinator
scale_msg = msgspec.msgpack.encode(
("SCALE_ELASTIC_EP", new_engine_count)
@@ -1178,6 +1289,11 @@ class DPAsyncMPClient(AsyncMPClient):
self.current_wave = wave
self.engines_running = running
if counts is not None:
# Running and waiting counts are global from the
# Coordinator including all EngineCores. Slice to get
# just the cores managed by this client.
ranks = self.engine_ranks_managed
count_slice = slice(ranks[0], ranks[-1] + 1)
sliced_counts = counts[count_slice]
self.lb_engines = sliced_counts
logger.debug(
@@ -1287,6 +1403,67 @@ class DPLBAsyncMPClient(DPAsyncMPClient):
for req_id in outputs.finished_requests:
self.reqs_in_flight.pop(req_id, None)
@staticmethod
async def eep_process_engine_core_notification(
self: "DPLBAsyncMPClient", notification_data: tuple[str, int]
):
cache = self.eep_scaling_cache
notification_type_str, dp_rank = notification_data
try:
notification_type = EEPNotificationType(notification_type_str)
except ValueError as e:
raise ValueError(
f"Unknown EEP notification type: {notification_type_str}"
) from e
if notification_type == EEPNotificationType.RECONFIGURE_FINISHED:
from vllm.v1.engine import UtilityResult
# NOTE(yongji): process a dummy UtilityOutput to resolve the future
# awaited in _eep_wait_for_setup_switch_complete(), signaling that
# all engine cores have completed reconfiguration.
dummy_output = UtilityOutput(
call_id=EEP_NOTIFICATION_CALL_ID, result=UtilityResult(None)
)
_process_utility_output(dummy_output, self.utility_results)
return
assert cache is not None
if notification_type not in cache.pending_notifications:
cache.pending_notifications[notification_type] = set()
if dp_rank in cache.pending_notifications[notification_type]:
raise ValueError(
f"Duplicate notification {notification_type} from dp_rank {dp_rank}"
)
cache.pending_notifications[notification_type].add(dp_rank)
if len(cache.pending_notifications[notification_type]) >= abs(
cache.num_new_core_engines
):
if notification_type == EEPNotificationType.SHUTDOWN_COMPLETE:
assert isinstance(self.resources.engine_manager, CoreEngineActorManager)
assert cache.num_new_core_engines < 0
old_dp_size = len(cache.existing_core_engines)
new_dp_size = old_dp_size + cache.num_new_core_engines
self.resources.engine_manager.scale_down_elastic_ep(
old_dp_size, new_dp_size
)
else:
await asyncio.gather(
*[
self._call_utility_async(
"eep_handle_engine_core_notification",
notification_type,
engine=engine,
)
for engine in cache.existing_core_engines
]
)
cache.pending_notifications[notification_type] = set()
if notification_type in [
EEPNotificationType.SHUTDOWN_COMPLETE,
EEPNotificationType.NEW_CORE_ENGINES_WEIGHTS_INIT_READY,
]:
self.eep_scaling_cache = None
async def abort_requests_async(self, request_ids: list[str]) -> None:
if not request_ids or self.resources.engine_dead:
return
@@ -1333,6 +1510,20 @@ class DPLBAsyncMPClient(DPAsyncMPClient):
cur_data_parallel_size, new_data_parallel_size
)
async def _eep_wait_for_setup_switch_complete(self) -> None:
"""
Wait for core engines to switch to the new setup.
In eep_process_engine_core_notification(), a dummy UtilityOutput with
EEP_NOTIFICATION_CALL_ID will be set when RECONFIGURE_FINISHED
notification is received from engine 0. We create a future with
that call_id and wait for it to be resolved.
"""
future = asyncio.get_running_loop().create_future()
self.utility_results[EEP_NOTIFICATION_CALL_ID] = future
self._ensure_output_queue_task()
await future
async def _scale_up_elastic_ep(
self, cur_data_parallel_size: int, new_data_parallel_size: int
) -> None:
@@ -1340,38 +1531,57 @@ class DPLBAsyncMPClient(DPAsyncMPClient):
and reconfiguring existing ones."""
cur_data_parallel_size = len(self.core_engines)
# Phase 1: Send reconfigure messages to all existing engines and wait
# for them to be sent
self.eep_scaling_cache = ElasticScalingCache(
existing_core_engines=self.core_engines.copy(),
num_new_core_engines=new_data_parallel_size - cur_data_parallel_size,
pending_notifications=dict(),
)
parallel_config = self.vllm_config.parallel_config
allocate_stateless_group_ports(parallel_config, new_data_parallel_size)
# Phase 1: Send reconfig messages to existing engines
reconfig_futures = []
self.vllm_config.parallel_config.data_parallel_master_port = get_open_port()
for engine in self.core_engines:
reconfig_request = ReconfigureDistributedRequest(
new_data_parallel_size=new_data_parallel_size,
new_data_parallel_rank=ReconfigureRankType.KEEP_CURRENT_RANK,
new_data_parallel_rank_local=ReconfigureRankType.KEEP_CURRENT_RANK,
new_data_parallel_master_ip=self.vllm_config.parallel_config.data_parallel_master_ip,
new_data_parallel_master_port=self.vllm_config.parallel_config.data_parallel_master_port,
new_data_parallel_master_ip=parallel_config.data_parallel_master_ip,
new_data_parallel_master_port=parallel_config.data_parallel_master_port,
new_data_parallel_master_port_list=parallel_config._data_parallel_master_port_list,
new_stateless_world_group_port_list=parallel_config._stateless_world_group_port_list,
new_stateless_dp_group_port_list=parallel_config._stateless_dp_group_port_list,
new_stateless_ep_group_port_list=parallel_config._stateless_ep_group_port_list,
new_stateless_eplb_group_port_list=parallel_config._stateless_eplb_group_port_list,
)
coro = self._call_utility_async(
"reinitialize_distributed", reconfig_request, engine=engine
)
reconfig_futures.append(asyncio.create_task(coro))
logger.info("All reconfigure messages sent, starting engine creation")
# Phase 2: Create new engines now that reconfig messages have been sent
# self.resources.engine_manager is guaranteed to be
# CoreEngineActorManager for RayDPClient
# Phase 2: Create new engines
assert isinstance(self.resources.engine_manager, CoreEngineActorManager)
self.resources.engine_manager.scale_up_elastic_ep(
self.vllm_config, new_data_parallel_size
parallel_config.eplb_config.num_redundant_experts = 0
start_new_worker_future = asyncio.to_thread(
self.resources.engine_manager.scale_up_elastic_ep,
self.vllm_config,
new_data_parallel_size,
)
wait_future = self._eep_wait_for_setup_switch_complete()
# Phase 3: Wait for new engines to be created
# and reconfig messages to be received
await asyncio.gather(start_new_worker_future, *reconfig_futures)
logger.info("[Elastic EP] Successfully started new engines")
# Create new CoreEngine objects for the new engines
new_engine_identities = set()
for i in range(cur_data_parallel_size, new_data_parallel_size):
new_engine = i.to_bytes(2, "little")
self.core_engines.append(new_engine)
# NOTE(yongji): we don't update lb_engines here,
# we let run_engine_stats_update_task to update it.
new_engine_identities.add(new_engine)
# Wait for ready messages from new engines on the input socket
@@ -1381,16 +1591,21 @@ class DPLBAsyncMPClient(DPAsyncMPClient):
timeout=VLLM_ENGINE_READY_TIMEOUT_S * 1000 # convert to ms
):
raise TimeoutError(
"Timed out waiting for new engines to send initial "
"message on input socket."
f"Timed out waiting for new engine core processes to "
f"start. Waited "
f"{VLLM_ENGINE_READY_TIMEOUT_S}s (configured by "
f"VLLM_ENGINE_READY_TIMEOUT_S). To increase the "
f"timeout, set the environment variable: "
f"VLLM_ENGINE_READY_TIMEOUT_S=<seconds>"
)
identity, _ = sync_input_socket.recv_multipart()
new_engine_identities.discard(identity)
# Phase 3: Wait for all existing engines to complete reconfiguration
logger.info("Waiting for existing engines to complete reconfiguration")
await asyncio.gather(*reconfig_futures)
# NOTE(yongji): Before we schedule any requests on the new workers,
# we should wait for them to switch to the new setup.
await wait_future
# Update the parallel config
self.vllm_config.parallel_config.data_parallel_size = new_data_parallel_size
# Notify coordinator about scale up through existing
# stats_update_task connection
self._ensure_stats_update_task()
@@ -1399,8 +1614,6 @@ class DPLBAsyncMPClient(DPAsyncMPClient):
)
await self.first_req_send_socket.send(scale_up_marker)
# Update the parallel config
self.vllm_config.parallel_config.data_parallel_size = new_data_parallel_size
logger.info(
"[Elastic EP] Scale up completed, new data parallel size: %s",
new_data_parallel_size,
@@ -1413,7 +1626,14 @@ class DPLBAsyncMPClient(DPAsyncMPClient):
reconfiguring existing engine cores."""
cur_data_parallel_size = len(self.core_engines)
self.vllm_config.parallel_config.data_parallel_master_port = get_open_port()
self.eep_scaling_cache = ElasticScalingCache(
existing_core_engines=self.core_engines.copy(),
num_new_core_engines=new_data_parallel_size - cur_data_parallel_size,
pending_notifications=dict(),
)
parallel_config = self.vllm_config.parallel_config
allocate_stateless_group_ports(parallel_config, new_data_parallel_size)
reconfig_futures = []
for cur_dp_rank, engine in enumerate(self.core_engines):
@@ -1421,8 +1641,13 @@ class DPLBAsyncMPClient(DPAsyncMPClient):
new_data_parallel_size=new_data_parallel_size,
new_data_parallel_rank=ReconfigureRankType.KEEP_CURRENT_RANK,
new_data_parallel_rank_local=ReconfigureRankType.KEEP_CURRENT_RANK,
new_data_parallel_master_ip=self.vllm_config.parallel_config.data_parallel_master_ip,
new_data_parallel_master_port=self.vllm_config.parallel_config.data_parallel_master_port,
new_data_parallel_master_ip=parallel_config.data_parallel_master_ip,
new_data_parallel_master_port=parallel_config.data_parallel_master_port,
new_data_parallel_master_port_list=parallel_config._data_parallel_master_port_list,
new_stateless_world_group_port_list=parallel_config._stateless_world_group_port_list,
new_stateless_dp_group_port_list=parallel_config._stateless_dp_group_port_list,
new_stateless_ep_group_port_list=parallel_config._stateless_ep_group_port_list,
new_stateless_eplb_group_port_list=parallel_config._stateless_eplb_group_port_list,
)
if cur_dp_rank >= new_data_parallel_size:
reconfig_request.new_data_parallel_rank = (
@@ -1433,23 +1658,24 @@ class DPLBAsyncMPClient(DPAsyncMPClient):
)
reconfig_futures.append(asyncio.create_task(coro))
for _ in range(new_data_parallel_size, cur_data_parallel_size):
self.core_engines.pop()
# NOTE(yongji): Immediately stop sending requests to the removing engines.
self.core_engines = self.core_engines[:new_data_parallel_size]
self.lb_engines = self.lb_engines[:new_data_parallel_size]
wait_future = self._eep_wait_for_setup_switch_complete()
await asyncio.gather(*reconfig_futures)
assert isinstance(self.resources.engine_manager, CoreEngineActorManager)
self.resources.engine_manager.scale_down_elastic_ep(
cur_data_parallel_size, new_data_parallel_size
)
self.vllm_config.parallel_config.data_parallel_size = new_data_parallel_size
self._ensure_stats_update_task()
scale_down_marker = msgspec.msgpack.encode(
("SCALE_ELASTIC_EP", new_data_parallel_size)
)
await self.first_req_send_socket.send(scale_down_marker)
self.vllm_config.parallel_config.data_parallel_size = new_data_parallel_size
# NOTE(yongji): Unlike scaling up,
# here we don't actually need to wait for the setup switch to complete.
# We may want to remove it in the future.
await wait_future
logger.info(
"[Elastic EP] Scale down completed, new data parallel size: %s",
new_data_parallel_size,

View File

@@ -2,7 +2,6 @@
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import time
import warnings
from collections.abc import Mapping
from typing import Any, Literal
@@ -114,16 +113,6 @@ class InputProcessor:
supported_tasks: tuple[SupportedTask, ...],
) -> None:
"""Raise `ValueError` if SamplingParams or PoolingParams is not valid."""
if params.truncate_prompt_tokens is not None:
params_type = type(params).__name__
warnings.warn(
f"The `truncate_prompt_tokens` parameter in `{params_type}` "
"is deprecated and will be removed in v0.17. "
"Please pass it via `tokenization_kwargs` instead.",
DeprecationWarning,
stacklevel=2,
)
if isinstance(params, SamplingParams):
supported_generation_tasks = [
task for task in supported_tasks if task in GENERATION_TASKS

View File

@@ -92,6 +92,7 @@ class LLMEngine:
self.renderer = renderer = renderer_from_config(self.vllm_config)
self.io_processor = get_io_processor(
self.vllm_config,
self.renderer,
self.model_config.io_processor_plugin,
)

View File

@@ -277,6 +277,8 @@ class CoreEngineActorManager:
else:
ray.init()
vllm_config.parallel_config.allocate_elastic_ep_ports()
if placement_groups is not None:
assert local_dp_ranks is not None, (
"local_dp_ranks must be provided if placement_groups is provided"
@@ -584,6 +586,8 @@ class CoreEngineActorManager:
node_ip = node.node_ip
node_id = node.node_id
if device_str not in available_resources[node_id]:
continue
available_gpus = int(available_resources[node_id][device_str])
# Get total GPUs on this node from the node's resources
@@ -773,11 +777,50 @@ class CoreEngineActorManager:
ray.util.remove_placement_group(pg)
def get_engine_zmq_addresses(
vllm_config: VllmConfig,
num_api_servers: int = 1,
) -> EngineZmqAddresses:
"""Allocate ZMQ addresses for engine-client communication."""
parallel_config = vllm_config.parallel_config
local_engine_count = parallel_config.data_parallel_size_local
local_start_index = parallel_config.data_parallel_rank_local
dp_size = parallel_config.data_parallel_size
host = parallel_config.data_parallel_master_ip
local_engines_only = parallel_config.local_engines_only
# In offline mode there is an LLM instance per DP rank and
# one core engine per LLM, see
# examples/offline_inference/data_parallel.py.
offline_mode = local_start_index is not None
# client_local_only = True for cases where this front-end
# sends requests only to colocated engines.
client_local_only = (
offline_mode or local_engines_only or (local_engine_count == dp_size)
)
# NOTE(yongji): handling scaling from intra-node to inter-node
if parallel_config.enable_elastic_ep:
client_local_only = False
return EngineZmqAddresses(
inputs=[
get_engine_client_zmq_addr(client_local_only, host)
for _ in range(num_api_servers)
],
outputs=[
get_engine_client_zmq_addr(client_local_only, host)
for _ in range(num_api_servers)
],
)
@contextlib.contextmanager
def launch_core_engines(
vllm_config: VllmConfig,
executor_class: type[Executor],
log_stats: bool,
addresses: EngineZmqAddresses,
num_api_servers: int = 1,
) -> Iterator[
tuple[
@@ -796,29 +839,8 @@ def launch_core_engines(
host = parallel_config.data_parallel_master_ip
local_engines_only = parallel_config.local_engines_only
# In offline mode there is an LLM instance per DP rank and
# one core engine per LLM, see
# examples/offline_inference/data_parallel.py.
offline_mode = local_start_index is not None
# client_local_only = True for cases where this front-end
# sends requests only to colocated engines.
client_local_only = (
offline_mode or local_engines_only or (local_engine_count == dp_size)
)
# Set up input and output addresses.
addresses = EngineZmqAddresses(
inputs=[
get_engine_client_zmq_addr(client_local_only, host)
for _ in range(num_api_servers)
],
outputs=[
get_engine_client_zmq_addr(client_local_only, host)
for _ in range(num_api_servers)
],
)
# Run the DP Coordinator process with rank 0 when in online DP mode.
# The coordinator is needed for:
# 1. Internal/hybrid LB: collecting and publishing queue stats for load balancing
@@ -885,6 +907,10 @@ def launch_core_engines(
# will be False.
handshake_local_only = offline_mode or local_engine_count == dp_size
# NOTE(yongji): handling scaling from intra-node to inter-node
if parallel_config.enable_elastic_ep:
handshake_local_only = False
handshake_address = get_engine_client_zmq_addr(
handshake_local_only, host, parallel_config.data_parallel_rpc_port
)