### What this PR does / why we need it?
**BUG**
When using prefill-decode disaggregation + MTP + full graph
+asynchronous scheduling, the KV cache pulled by decode nodes from
prefill decodes does not include spec tokens. As a result, the
total_num_scheduled_tokens obtained by decode nodes from the scheduler
lacks spec tokens. When determining whether to enqueue the full graph on
decode nodes, the condition for uniform_decode `
scheduler_output.total_num_scheduled_tokens == self.input_batch.num_reqs
* max_query_len` is not met, leading to the current instance not being
enqueued into the full graph.
The above situation leads to both full graph and eagle mode instances
coexisting in the decode instances. Due to the synchronization wait of
MoeDispatch, the decode instances in full graph are significantly slowed
down by the instance in eagle mode.
**Solution**
The scenario is PD separation + MTP + Full Graph + asynchronous
scheduling.
On the decode nodes, the spec tokens of the request with KV cache from P
need be padded. Then, the padded spec tokens will be rejected by
sampling. This operation ensures that the uniform_decode condition is
satisfied when determining whether decode nodes are included in the full
graph, thereby guaranteeing that all decode instances are present in the
full graph and avoiding synchronous waiting for MoeDispatch.
- vLLM version: v0.15.0
- vLLM main:
5326c89803
Signed-off-by: chenmenglong <chenmenglong1@huawei.com>
895 lines
43 KiB
Python
895 lines
43 KiB
Python
##
|
|
# Copyright (c) 2025 Huawei Technologies Co., Ltd. All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
# This file is a part of the vllm-ascend project.
|
|
# Adapted from vllm-project/vllm/vllm/v1/core/sched/scheduler.py
|
|
#
|
|
|
|
from __future__ import annotations
|
|
|
|
import time
|
|
from collections import defaultdict
|
|
from dataclasses import dataclass, fields
|
|
|
|
import numpy as np
|
|
from vllm._bc_linter import bc_linter_include
|
|
from vllm.config import SchedulerConfig, VllmConfig
|
|
from vllm.distributed.ec_transfer.ec_connector.base import ECConnectorMetadata
|
|
from vllm.distributed.kv_events import KVEventBatch
|
|
from vllm.distributed.kv_transfer.kv_connector.v1.base import KVConnectorMetadata
|
|
from vllm.distributed.kv_transfer.kv_connector.v1.metrics import KVConnectorStats
|
|
from vllm.logger import init_logger
|
|
from vllm.v1.core.kv_cache_manager import KVCacheBlocks
|
|
from vllm.v1.core.sched.async_scheduler import AsyncScheduler
|
|
from vllm.v1.core.sched.output import NewRequestData, SchedulerOutput
|
|
from vllm.v1.core.sched.request_queue import SchedulingPolicy, create_request_queue
|
|
from vllm.v1.core.sched.scheduler import Scheduler
|
|
from vllm.v1.core.sched.utils import remove_all
|
|
from vllm.v1.engine import EngineCoreEventType, EngineCoreOutput, EngineCoreOutputs, FinishReason
|
|
from vllm.v1.metrics.perf import PerfStats
|
|
from vllm.v1.outputs import ModelRunnerOutput
|
|
from vllm.v1.request import Request, RequestStatus
|
|
from vllm.v1.sample.rejection_sampler import PLACEHOLDER_TOKEN_ID
|
|
from vllm.v1.spec_decode.metrics import SpecDecodingStats
|
|
from vllm.v1.utils import ConstantList, record_function_or_nullcontext
|
|
|
|
logger = init_logger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class RecomputeSchedulerConfig(SchedulerConfig):
|
|
scheduler_cls: str | type[object] = "vllm_ascend.core.recompute_scheduler.RecomputeScheduler"
|
|
|
|
@classmethod
|
|
def initialize_from_config(cls, vllm_config: VllmConfig):
|
|
vllm_scheduler_config = vllm_config.scheduler_config
|
|
scheduler_config = {
|
|
field.name: getattr(vllm_scheduler_config, field.name)
|
|
for field in fields(vllm_scheduler_config)
|
|
if field.init
|
|
}
|
|
if vllm_scheduler_config.async_scheduling:
|
|
scheduler_config["scheduler_cls"] = "vllm_ascend.core.recompute_scheduler.AsyncRecomputeScheduler"
|
|
else:
|
|
scheduler_config["scheduler_cls"] = "vllm_ascend.core.recompute_scheduler.RecomputeScheduler"
|
|
scheduler_config["max_model_len"] = vllm_config.model_config.max_model_len
|
|
scheduler_config["is_encoder_decoder"] = vllm_config.model_config.is_encoder_decoder
|
|
return cls(**scheduler_config)
|
|
|
|
|
|
@dataclass
|
|
class RecomputeReqInfo:
|
|
request_id: str
|
|
output_token_ids: ConstantList
|
|
client_index: int = 0
|
|
|
|
|
|
@bc_linter_include
|
|
@dataclass
|
|
class RecomputeSchedulerOutput(SchedulerOutput):
|
|
recomputed_reqs: list[RecomputeReqInfo] | None = None
|
|
|
|
|
|
class RecomputeScheduler(Scheduler):
|
|
running: list[Request]
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
# When is_mtp_kv_consumer is true, we will fill request.spec_token_ids
|
|
# with placeholder tokens to enable full graph when decode nodes pull
|
|
# the KV cache of one request from prefill nodes.
|
|
self.is_mtp_kv_consumer = (
|
|
self.vllm_config.speculative_config
|
|
and self.vllm_config.kv_transfer_config
|
|
and self.vllm_config.kv_transfer_config.is_kv_consumer
|
|
)
|
|
|
|
def add_request(self, request: Request) -> None:
|
|
# Fill in placeholder tokens to enable full graph compatibility. Without
|
|
# placeholders, graph matching may fail, forcing eager mode execution.
|
|
if self.is_mtp_kv_consumer:
|
|
request.spec_token_ids = [PLACEHOLDER_TOKEN_ID] * self.num_spec_tokens
|
|
self.waiting.add_request(request)
|
|
self.requests[request.request_id] = request
|
|
if self.log_stats:
|
|
request.record_event(EngineCoreEventType.QUEUED)
|
|
|
|
def schedule(self) -> RecomputeSchedulerOutput:
|
|
# NOTE(woosuk) on the scheduling algorithm:
|
|
# There's no "decoding phase" nor "prefill phase" in the scheduler.
|
|
# Each request just has the num_computed_tokens and
|
|
# num_tokens_with_spec. num_tokens_with_spec =
|
|
# len(prompt_token_ids) + len(output_token_ids) + len(spec_token_ids).
|
|
# At each step, the scheduler tries to assign tokens to the requests
|
|
# so that each request's num_computed_tokens can catch up its
|
|
# num_tokens_with_spec. This is general enough to cover
|
|
# chunked prefills, prefix caching, speculative decoding,
|
|
# and the "jump decoding" optimization in the future.
|
|
|
|
scheduled_new_reqs: list[Request] = []
|
|
scheduled_resumed_reqs: list[Request] = []
|
|
scheduled_running_reqs: list[Request] = []
|
|
preempted_reqs: list[Request] = []
|
|
recomputed_reqs: list[RecomputeReqInfo] = []
|
|
|
|
req_to_new_blocks: dict[str, KVCacheBlocks] = {}
|
|
num_scheduled_tokens: dict[str, int] = {}
|
|
token_budget = self.max_num_scheduled_tokens
|
|
# Encoder-related.
|
|
scheduled_encoder_inputs: dict[str, list[int]] = {}
|
|
encoder_compute_budget = self.max_num_encoder_input_tokens
|
|
# Spec decode-related.
|
|
scheduled_spec_decode_tokens: dict[str, list[int]] = {}
|
|
|
|
# For logging.
|
|
scheduled_timestamp = time.monotonic()
|
|
|
|
# First, schedule the RUNNING requests.
|
|
req_index = 0
|
|
while req_index < len(self.running) and token_budget > 0:
|
|
request = self.running[req_index]
|
|
|
|
if (
|
|
request.num_output_placeholders > 0
|
|
# This is (num_computed_tokens + 1) - (num_output_placeholders - 1).
|
|
# Since output placeholders are also included in the computed tokens
|
|
# count, we subtract (num_output_placeholders - 1) to remove any draft
|
|
# tokens, so that we can be sure no further steps are needed even if
|
|
# they are all rejected.
|
|
and request.num_computed_tokens + 2 - request.num_output_placeholders
|
|
>= request.num_prompt_tokens + request.max_tokens
|
|
):
|
|
# Async scheduling: Avoid scheduling an extra step when we are sure that
|
|
# the previous step has reached request.max_tokens. We don't schedule
|
|
# partial draft tokens since this prevents uniform decode optimizations.
|
|
req_index += 1
|
|
continue
|
|
|
|
num_new_tokens = (
|
|
request.num_tokens_with_spec + request.num_output_placeholders - request.num_computed_tokens
|
|
)
|
|
if 0 < self.scheduler_config.long_prefill_token_threshold < num_new_tokens:
|
|
num_new_tokens = self.scheduler_config.long_prefill_token_threshold
|
|
num_new_tokens = min(num_new_tokens, token_budget)
|
|
|
|
# Make sure the input position does not exceed the max model len.
|
|
# This is necessary when using spec decoding.
|
|
num_new_tokens = min(num_new_tokens, self.max_model_len - 1 - request.num_computed_tokens)
|
|
|
|
# Schedule encoder inputs.
|
|
encoder_inputs_to_schedule = None
|
|
external_load_encoder_input: list[int] = []
|
|
new_encoder_compute_budget = encoder_compute_budget
|
|
if request.has_encoder_inputs:
|
|
(
|
|
encoder_inputs_to_schedule,
|
|
num_new_tokens,
|
|
new_encoder_compute_budget,
|
|
external_load_encoder_input,
|
|
) = self._try_schedule_encoder_inputs(
|
|
request,
|
|
request.num_computed_tokens,
|
|
num_new_tokens,
|
|
encoder_compute_budget,
|
|
shift_computed_tokens=1 if self.use_eagle else 0,
|
|
)
|
|
|
|
if num_new_tokens == 0:
|
|
# The request cannot be scheduled because one of the following
|
|
# reasons:
|
|
# 1. No new tokens to schedule. This may happen when
|
|
# (1) PP>1 and we have already scheduled all prompt tokens
|
|
# but they are not finished yet.
|
|
# (2) Async scheduling and the request has reached to either
|
|
# its max_total_tokens or max_model_len.
|
|
# 2. The encoder budget is exhausted.
|
|
# 3. The encoder cache is exhausted.
|
|
# NOTE(woosuk): Here, by doing `continue` instead of `break`,
|
|
# we do not strictly follow the FCFS scheduling policy and
|
|
# allow the lower-priority requests to be scheduled.
|
|
req_index += 1
|
|
continue
|
|
|
|
# Schedule newly needed KV blocks for the request.
|
|
with record_function_or_nullcontext("schedule: allocate_slots"):
|
|
while True:
|
|
new_blocks = self.kv_cache_manager.allocate_slots(
|
|
request,
|
|
num_new_tokens,
|
|
num_lookahead_tokens=self.num_lookahead_tokens,
|
|
)
|
|
|
|
if new_blocks is not None:
|
|
# The request can be scheduled.
|
|
break
|
|
|
|
# The request cannot be scheduled.
|
|
# Preempt the lowest-priority request.
|
|
# NOTE: We add the preempted_req to recomputed_reqs in kv_consumer to
|
|
# drop the request to PD proxy.
|
|
transfer_config = self.vllm_config.kv_transfer_config
|
|
if transfer_config is not None and not transfer_config.is_kv_producer:
|
|
recomputed_req = self.running.pop()
|
|
self.kv_cache_manager.free(recomputed_req)
|
|
recomputed_reqs.append(
|
|
RecomputeReqInfo(
|
|
recomputed_req.request_id, recomputed_req.output_token_ids, recomputed_req.client_index
|
|
)
|
|
)
|
|
if recomputed_req == request:
|
|
break
|
|
else:
|
|
if self.policy == SchedulingPolicy.PRIORITY:
|
|
preempted_req = max(
|
|
self.running,
|
|
key=lambda r: (r.priority, r.arrival_time),
|
|
)
|
|
self.running.remove(preempted_req)
|
|
if preempted_req in scheduled_running_reqs:
|
|
scheduled_running_reqs.remove(preempted_req)
|
|
token_budget += num_scheduled_tokens[preempted_req.request_id]
|
|
req_to_new_blocks.pop(preempted_req.request_id)
|
|
num_scheduled_tokens.pop(preempted_req.request_id)
|
|
scheduled_spec_decode_tokens.pop(preempted_req.request_id, None)
|
|
preempted_encoder_inputs = scheduled_encoder_inputs.pop(preempted_req.request_id, None)
|
|
if preempted_encoder_inputs:
|
|
# Restore encoder compute budget if the preempted
|
|
# request had encoder inputs scheduled in this step.
|
|
num_embeds_to_restore = sum(
|
|
preempted_req.get_num_encoder_embeds(i) for i in preempted_encoder_inputs
|
|
)
|
|
encoder_compute_budget += num_embeds_to_restore
|
|
req_index -= 1
|
|
else:
|
|
preempted_req = self.running.pop()
|
|
|
|
self._preempt_request(preempted_req, scheduled_timestamp)
|
|
preempted_reqs.append(preempted_req)
|
|
if preempted_req == request:
|
|
# No more request to preempt. Cannot schedule this request.
|
|
break
|
|
|
|
if new_blocks is None:
|
|
# Cannot schedule this request.
|
|
break
|
|
|
|
# Schedule the request.
|
|
scheduled_running_reqs.append(request)
|
|
req_to_new_blocks[request.request_id] = new_blocks
|
|
num_scheduled_tokens[request.request_id] = num_new_tokens
|
|
token_budget -= num_new_tokens
|
|
req_index += 1
|
|
|
|
# Speculative decode related.
|
|
if request.spec_token_ids:
|
|
num_scheduled_spec_tokens = (
|
|
num_new_tokens + request.num_computed_tokens - request.num_tokens - request.num_output_placeholders
|
|
)
|
|
if num_scheduled_spec_tokens > 0:
|
|
# Trim spec_token_ids list to num_scheduled_spec_tokens.
|
|
del request.spec_token_ids[num_scheduled_spec_tokens:]
|
|
scheduled_spec_decode_tokens[request.request_id] = request.spec_token_ids
|
|
# New spec tokens will be set in `update_draft_token_ids` before the
|
|
# next step when applicable.
|
|
request.spec_token_ids = []
|
|
|
|
# Encoder-related.
|
|
if encoder_inputs_to_schedule:
|
|
scheduled_encoder_inputs[request.request_id] = encoder_inputs_to_schedule
|
|
# Allocate the encoder cache.
|
|
for i in encoder_inputs_to_schedule:
|
|
self.encoder_cache_manager.allocate(request, i)
|
|
encoder_compute_budget = new_encoder_compute_budget
|
|
if external_load_encoder_input:
|
|
for i in external_load_encoder_input:
|
|
self.encoder_cache_manager.allocate(request, i)
|
|
if self.ec_connector is not None:
|
|
self.ec_connector.update_state_after_alloc(request, i)
|
|
|
|
# Record the LoRAs in scheduled_running_reqs
|
|
scheduled_loras: set[int] = set()
|
|
if self.lora_config:
|
|
scheduled_loras = set(
|
|
req.lora_request.lora_int_id
|
|
for req in scheduled_running_reqs
|
|
if req.lora_request and req.lora_request.lora_int_id > 0
|
|
)
|
|
assert len(scheduled_loras) <= self.lora_config.max_loras
|
|
|
|
# Use a temporary RequestQueue to collect requests that need to be
|
|
# skipped and put back at the head of the waiting queue later
|
|
skipped_waiting_requests = create_request_queue(self.policy)
|
|
|
|
# Next, schedule the WAITING requests.
|
|
if not preempted_reqs and not recomputed_reqs:
|
|
while self.waiting and token_budget > 0:
|
|
if len(self.running) == self.max_num_running_reqs:
|
|
break
|
|
|
|
request = self.waiting.peek_request()
|
|
|
|
# KVTransfer: skip request if still waiting for remote kvs.
|
|
if request.status == RequestStatus.WAITING_FOR_REMOTE_KVS:
|
|
is_ready = self._update_waiting_for_remote_kv(request)
|
|
if is_ready:
|
|
if request.num_preemptions:
|
|
# We must be loading for a resumed preemption
|
|
# rather than a new request.
|
|
request.status = RequestStatus.PREEMPTED
|
|
else:
|
|
request.status = RequestStatus.WAITING
|
|
else:
|
|
logger.debug(
|
|
"%s is still in WAITING_FOR_REMOTE_KVS state.",
|
|
request.request_id,
|
|
)
|
|
self.waiting.pop_request()
|
|
skipped_waiting_requests.prepend_request(request)
|
|
continue
|
|
|
|
# Skip request if the structured output request is still waiting
|
|
# for FSM compilation.
|
|
if request.status == RequestStatus.WAITING_FOR_FSM:
|
|
structured_output_req = request.structured_output_request
|
|
if structured_output_req and structured_output_req.grammar:
|
|
request.status = RequestStatus.WAITING
|
|
else:
|
|
self.waiting.pop_request()
|
|
skipped_waiting_requests.prepend_request(request)
|
|
continue
|
|
|
|
# Check that adding the request still respects the max_loras
|
|
# constraint.
|
|
if (
|
|
self.lora_config
|
|
and request.lora_request
|
|
and (
|
|
len(scheduled_loras) == self.lora_config.max_loras
|
|
and request.lora_request.lora_int_id not in scheduled_loras
|
|
)
|
|
):
|
|
# Scheduling would exceed max_loras, skip.
|
|
self.waiting.pop_request()
|
|
skipped_waiting_requests.prepend_request(request)
|
|
continue
|
|
|
|
num_external_computed_tokens = 0
|
|
load_kv_async = False
|
|
|
|
# Get already-cached tokens.
|
|
if request.num_computed_tokens == 0:
|
|
# Get locally-cached tokens.
|
|
new_computed_blocks, num_new_local_computed_tokens = self.kv_cache_manager.get_computed_blocks(
|
|
request
|
|
)
|
|
|
|
# Get externally-cached tokens if using a KVConnector.
|
|
if self.connector is not None:
|
|
ext_tokens, load_kv_async = self.connector.get_num_new_matched_tokens(
|
|
request, num_new_local_computed_tokens
|
|
)
|
|
|
|
if ext_tokens is None:
|
|
# The request cannot be scheduled because
|
|
# the KVConnector couldn't determine
|
|
# the number of matched tokens.
|
|
self.waiting.pop_request()
|
|
skipped_waiting_requests.prepend_request(request)
|
|
continue
|
|
|
|
request.num_external_computed_tokens = ext_tokens
|
|
num_external_computed_tokens = ext_tokens
|
|
|
|
# Total computed tokens (local + external).
|
|
num_computed_tokens = num_new_local_computed_tokens + num_external_computed_tokens
|
|
else:
|
|
# KVTransfer: WAITING reqs have num_computed_tokens > 0
|
|
# after async KV recvs are completed.
|
|
new_computed_blocks = self.kv_cache_manager.empty_kv_cache_blocks
|
|
num_new_local_computed_tokens = 0
|
|
num_computed_tokens = request.num_computed_tokens
|
|
|
|
encoder_inputs_to_schedule = None
|
|
external_load_encoder_input = []
|
|
new_encoder_compute_budget = encoder_compute_budget
|
|
|
|
if load_kv_async:
|
|
# KVTransfer: loading remote KV, do not allocate for new work.
|
|
assert num_external_computed_tokens > 0
|
|
num_new_tokens = 0
|
|
else:
|
|
# Number of tokens to be scheduled.
|
|
# We use `request.num_tokens` instead of
|
|
# `request.num_prompt_tokens` to consider the resumed
|
|
# requests, which have output tokens.
|
|
if self.is_mtp_kv_consumer:
|
|
num_new_tokens = request.num_tokens_with_spec - num_computed_tokens
|
|
else:
|
|
num_new_tokens = request.num_tokens - num_computed_tokens
|
|
threshold = self.scheduler_config.long_prefill_token_threshold
|
|
if 0 < threshold < num_new_tokens:
|
|
num_new_tokens = threshold
|
|
|
|
# chunked prefill has to be enabled explicitly to allow
|
|
# pooling requests to be chunked
|
|
if not self.scheduler_config.enable_chunked_prefill and num_new_tokens > token_budget:
|
|
# If chunked_prefill is disabled,
|
|
# we can stop the scheduling here.
|
|
break
|
|
|
|
num_new_tokens = min(num_new_tokens, token_budget)
|
|
assert num_new_tokens > 0
|
|
|
|
# Schedule encoder inputs.
|
|
if request.has_encoder_inputs:
|
|
(
|
|
encoder_inputs_to_schedule,
|
|
num_new_tokens,
|
|
new_encoder_compute_budget,
|
|
external_load_encoder_input,
|
|
) = self._try_schedule_encoder_inputs(
|
|
request,
|
|
num_computed_tokens,
|
|
num_new_tokens,
|
|
encoder_compute_budget,
|
|
shift_computed_tokens=1 if self.use_eagle else 0,
|
|
)
|
|
if num_new_tokens == 0:
|
|
# The request cannot be scheduled.
|
|
break
|
|
|
|
# Handles an edge case when P/D Disaggregation
|
|
# is used with Spec Decoding where an
|
|
# extra block gets allocated which
|
|
# creates a mismatch between the number
|
|
# of local and remote blocks.
|
|
effective_lookahead_tokens = 0 if request.num_computed_tokens == 0 else self.num_lookahead_tokens
|
|
|
|
num_encoder_tokens = (
|
|
self._num_encoder_max_input_tokens if self.is_encoder_decoder and request.has_encoder_inputs else 0
|
|
)
|
|
|
|
new_blocks = self.kv_cache_manager.allocate_slots(
|
|
request,
|
|
num_new_tokens,
|
|
num_new_computed_tokens=num_new_local_computed_tokens,
|
|
new_computed_blocks=new_computed_blocks,
|
|
num_lookahead_tokens=effective_lookahead_tokens,
|
|
num_external_computed_tokens=num_external_computed_tokens,
|
|
delay_cache_blocks=load_kv_async,
|
|
num_encoder_tokens=num_encoder_tokens,
|
|
)
|
|
|
|
if new_blocks is None:
|
|
# The request cannot be scheduled.
|
|
|
|
# NOTE: we need to untouch the request from the encode cache
|
|
# manager
|
|
if request.has_encoder_inputs:
|
|
self.encoder_cache_manager.free(request)
|
|
break
|
|
|
|
# KVTransfer: the connector uses this info to determine
|
|
# if a load is needed. Note that
|
|
# This information is used to determine if a load is
|
|
# needed for this request.
|
|
if self.connector is not None:
|
|
self.connector.update_state_after_alloc(
|
|
request,
|
|
self.kv_cache_manager.get_blocks(request.request_id),
|
|
num_external_computed_tokens,
|
|
)
|
|
|
|
# Request was already popped from self.waiting
|
|
# unless it was re-added above due to new_blocks being None.
|
|
request = self.waiting.pop_request()
|
|
if load_kv_async:
|
|
# If loading async, allocate memory and put request
|
|
# into the WAITING_FOR_REMOTE_KV state.
|
|
skipped_waiting_requests.prepend_request(request)
|
|
request.status = RequestStatus.WAITING_FOR_REMOTE_KVS
|
|
continue
|
|
|
|
self._update_connector_prefix_cache_stats(request)
|
|
|
|
# For spec_token_ids, the waiting queue has the same processing
|
|
# as the running queue.
|
|
if self.is_mtp_kv_consumer and request.spec_token_ids:
|
|
num_scheduled_spec_tokens = (
|
|
num_new_tokens
|
|
+ request.num_computed_tokens
|
|
- request.num_tokens
|
|
- request.num_output_placeholders
|
|
)
|
|
if num_scheduled_spec_tokens > 0:
|
|
# Trim spec_token_ids list to num_scheduled_spec_tokens.
|
|
del request.spec_token_ids[num_scheduled_spec_tokens:]
|
|
scheduled_spec_decode_tokens[request.request_id] = request.spec_token_ids
|
|
# New spec tokens will be set in `update_draft_token_ids` before the
|
|
# next step when applicable.
|
|
request.spec_token_ids = []
|
|
|
|
self.running.append(request)
|
|
if self.log_stats:
|
|
request.record_event(EngineCoreEventType.SCHEDULED, scheduled_timestamp)
|
|
if request.status == RequestStatus.WAITING:
|
|
scheduled_new_reqs.append(request)
|
|
elif request.status == RequestStatus.PREEMPTED:
|
|
scheduled_resumed_reqs.append(request)
|
|
else:
|
|
raise RuntimeError(f"Invalid request status: {request.status}")
|
|
|
|
if self.lora_config and request.lora_request:
|
|
scheduled_loras.add(request.lora_request.lora_int_id)
|
|
req_to_new_blocks[request.request_id] = self.kv_cache_manager.get_blocks(request.request_id)
|
|
num_scheduled_tokens[request.request_id] = num_new_tokens
|
|
token_budget -= num_new_tokens
|
|
request.status = RequestStatus.RUNNING
|
|
request.num_computed_tokens = num_computed_tokens
|
|
# Count the number of prefix cached tokens.
|
|
if request.num_cached_tokens < 0:
|
|
request.num_cached_tokens = num_computed_tokens
|
|
# Encoder-related.
|
|
if encoder_inputs_to_schedule:
|
|
scheduled_encoder_inputs[request.request_id] = encoder_inputs_to_schedule
|
|
# Allocate the encoder cache.
|
|
for i in encoder_inputs_to_schedule:
|
|
self.encoder_cache_manager.allocate(request, i)
|
|
encoder_compute_budget = new_encoder_compute_budget
|
|
# Allocate for external load encoder cache
|
|
if external_load_encoder_input:
|
|
for i in external_load_encoder_input:
|
|
self.encoder_cache_manager.allocate(request, i)
|
|
if self.ec_connector is not None:
|
|
self.ec_connector.update_state_after_alloc(request, i)
|
|
# Put back any skipped requests at the head of the waiting queue
|
|
if skipped_waiting_requests:
|
|
self.waiting.prepend_requests(skipped_waiting_requests)
|
|
|
|
# Check if the scheduling constraints are satisfied.
|
|
total_num_scheduled_tokens = sum(num_scheduled_tokens.values())
|
|
assert total_num_scheduled_tokens <= self.max_num_scheduled_tokens
|
|
|
|
assert token_budget >= 0
|
|
assert len(self.running) <= self.max_num_running_reqs
|
|
# Since some requests in the RUNNING queue may not be scheduled in
|
|
# this step, the total number of scheduled requests can be smaller than
|
|
# len(self.running).
|
|
assert len(scheduled_new_reqs) + len(scheduled_resumed_reqs) + len(scheduled_running_reqs) <= len(self.running)
|
|
|
|
# Get the longest common prefix among all requests in the running queue.
|
|
# This can be potentially used for cascade attention.
|
|
num_common_prefix_blocks = [0] * len(self.kv_cache_config.kv_cache_groups)
|
|
with record_function_or_nullcontext("schedule: get_num_common_prefix_blocks"):
|
|
if self.running:
|
|
any_request = self.running[0]
|
|
num_common_prefix_blocks = self.kv_cache_manager.get_num_common_prefix_blocks(any_request.request_id)
|
|
|
|
# Construct the scheduler output.
|
|
if self.use_v2_model_runner:
|
|
scheduled_new_reqs = scheduled_new_reqs + scheduled_resumed_reqs
|
|
scheduled_resumed_reqs = []
|
|
new_reqs_data = [
|
|
NewRequestData.from_request(
|
|
req,
|
|
req_to_new_blocks[req.request_id].get_block_ids(),
|
|
req._all_token_ids,
|
|
)
|
|
for req in scheduled_new_reqs
|
|
]
|
|
else:
|
|
new_reqs_data = [
|
|
NewRequestData.from_request(req, req_to_new_blocks[req.request_id].get_block_ids())
|
|
for req in scheduled_new_reqs
|
|
]
|
|
|
|
with record_function_or_nullcontext("schedule: make_cached_request_data"):
|
|
cached_reqs_data = self._make_cached_request_data(
|
|
scheduled_running_reqs,
|
|
scheduled_resumed_reqs,
|
|
num_scheduled_tokens,
|
|
scheduled_spec_decode_tokens,
|
|
req_to_new_blocks,
|
|
)
|
|
|
|
# Record the request ids that were scheduled in this step.
|
|
self.prev_step_scheduled_req_ids.clear()
|
|
self.prev_step_scheduled_req_ids.update(num_scheduled_tokens.keys())
|
|
|
|
scheduler_output = RecomputeSchedulerOutput(
|
|
scheduled_new_reqs=new_reqs_data,
|
|
scheduled_cached_reqs=cached_reqs_data,
|
|
num_scheduled_tokens=num_scheduled_tokens,
|
|
total_num_scheduled_tokens=total_num_scheduled_tokens,
|
|
scheduled_spec_decode_tokens=scheduled_spec_decode_tokens,
|
|
scheduled_encoder_inputs=scheduled_encoder_inputs,
|
|
num_common_prefix_blocks=num_common_prefix_blocks,
|
|
preempted_req_ids={req.request_id for req in preempted_reqs},
|
|
# finished_req_ids is an existing state in the scheduler,
|
|
# instead of being newly scheduled in this step.
|
|
# It contains the request IDs that are finished in between
|
|
# the previous and the current steps.
|
|
finished_req_ids=self.finished_req_ids,
|
|
free_encoder_mm_hashes=self.encoder_cache_manager.get_freed_mm_hashes(),
|
|
recomputed_reqs=recomputed_reqs,
|
|
)
|
|
|
|
# NOTE(Kuntai): this function is designed for multiple purposes:
|
|
# 1. Plan the KV cache store
|
|
# 2. Wrap up all the KV cache load / save ops into an opaque object
|
|
# 3. Clear the internal states of the connector
|
|
if self.connector is not None:
|
|
meta: KVConnectorMetadata = self.connector.build_connector_meta(scheduler_output)
|
|
scheduler_output.kv_connector_metadata = meta
|
|
|
|
# Build the connector meta for ECConnector
|
|
if self.ec_connector is not None:
|
|
ec_meta: ECConnectorMetadata = self.ec_connector.build_connector_meta(scheduler_output)
|
|
scheduler_output.ec_connector_metadata = ec_meta
|
|
|
|
with record_function_or_nullcontext("schedule: update_after_schedule"):
|
|
self._update_after_schedule(scheduler_output)
|
|
return scheduler_output
|
|
|
|
def update_from_output(
|
|
self,
|
|
scheduler_output: RecomputeSchedulerOutput,
|
|
model_runner_output: ModelRunnerOutput,
|
|
) -> dict[int, EngineCoreOutputs]:
|
|
sampled_token_ids = model_runner_output.sampled_token_ids
|
|
logprobs = model_runner_output.logprobs
|
|
prompt_logprobs_dict = model_runner_output.prompt_logprobs_dict
|
|
num_scheduled_tokens = scheduler_output.num_scheduled_tokens
|
|
pooler_outputs = model_runner_output.pooler_output
|
|
num_nans_in_logits = model_runner_output.num_nans_in_logits
|
|
kv_connector_output = model_runner_output.kv_connector_output
|
|
cudagraph_stats = model_runner_output.cudagraph_stats
|
|
|
|
perf_stats: PerfStats | None = None
|
|
if self.perf_metrics and self.perf_metrics.is_enabled():
|
|
perf_stats = self.perf_metrics.get_step_perf_stats_per_gpu(scheduler_output)
|
|
|
|
outputs: dict[int, list[EngineCoreOutput]] = defaultdict(list)
|
|
spec_decoding_stats: SpecDecodingStats | None = None
|
|
kv_connector_stats: KVConnectorStats | None = (
|
|
kv_connector_output.kv_connector_stats if kv_connector_output else None
|
|
)
|
|
if kv_connector_stats and self.connector:
|
|
kv_stats = self.connector.get_kv_connector_stats()
|
|
if kv_stats:
|
|
kv_connector_stats = kv_connector_stats.aggregate(kv_stats)
|
|
|
|
failed_kv_load_req_ids = None
|
|
if kv_connector_output and kv_connector_output.invalid_block_ids:
|
|
# These blocks contain externally computed tokens that failed to
|
|
# load. Identify affected requests and adjust their computed token
|
|
# count to trigger recomputation of the invalid blocks.
|
|
failed_kv_load_req_ids = self._handle_invalid_blocks(kv_connector_output.invalid_block_ids)
|
|
|
|
# return recomputed requests as EngineCoreOutput
|
|
if scheduler_output.recomputed_reqs is not None:
|
|
for req_info in scheduler_output.recomputed_reqs:
|
|
outputs[req_info.client_index].append(
|
|
EngineCoreOutput(
|
|
request_id=req_info.request_id,
|
|
finish_reason=FinishReason.STOP,
|
|
new_token_ids=[],
|
|
stop_reason="recomputed",
|
|
)
|
|
)
|
|
|
|
# NOTE(woosuk): As len(num_scheduled_tokens) can be up to 1K or more,
|
|
# the below loop can be a performance bottleneck. We should do our best
|
|
# to avoid expensive operations inside the loop.
|
|
stopped_running_reqs: set[Request] = set()
|
|
stopped_preempted_reqs: set[Request] = set()
|
|
for req_id, num_tokens_scheduled in num_scheduled_tokens.items():
|
|
assert num_tokens_scheduled > 0
|
|
if failed_kv_load_req_ids and req_id in failed_kv_load_req_ids:
|
|
# skip failed or rescheduled requests from KV load failure
|
|
continue
|
|
request = self.requests.get(req_id)
|
|
if request is None:
|
|
# The request is already finished. This can happen if the
|
|
# request is aborted while the model is executing it (e.g.,
|
|
# in pipeline parallelism).
|
|
continue
|
|
|
|
req_index = model_runner_output.req_id_to_index[req_id]
|
|
generated_token_ids = sampled_token_ids[req_index] if sampled_token_ids else []
|
|
|
|
scheduled_spec_token_ids = scheduler_output.scheduled_spec_decode_tokens.get(req_id)
|
|
if scheduled_spec_token_ids:
|
|
num_draft_tokens = len(scheduled_spec_token_ids)
|
|
num_accepted = len(generated_token_ids) - 1
|
|
num_rejected = num_draft_tokens - num_accepted
|
|
# num_computed_tokens represents the number of tokens
|
|
# processed in the current step, considering scheduled
|
|
# tokens and rejections. If some tokens are rejected,
|
|
# num_computed_tokens is decreased by the number of rejected
|
|
# tokens.
|
|
if request.num_computed_tokens > 0:
|
|
request.num_computed_tokens -= num_rejected
|
|
# If async scheduling, num_output_placeholders also includes
|
|
# the scheduled spec tokens count and so is similarly adjusted.
|
|
if request.num_output_placeholders > 0:
|
|
request.num_output_placeholders -= num_rejected
|
|
spec_decoding_stats = self.make_spec_decoding_stats(
|
|
spec_decoding_stats,
|
|
num_draft_tokens=num_draft_tokens,
|
|
num_accepted_tokens=num_accepted,
|
|
num_invalid_spec_tokens=scheduler_output.num_invalid_spec_tokens,
|
|
request_id=req_id,
|
|
)
|
|
|
|
stopped = False
|
|
new_logprobs = None
|
|
new_token_ids = generated_token_ids
|
|
pooler_output = pooler_outputs[req_index] if pooler_outputs else None
|
|
kv_transfer_params = None
|
|
status_before_stop = request.status
|
|
|
|
# Check for stop and update request status.
|
|
if new_token_ids:
|
|
new_token_ids, stopped = self._update_request_with_output(request, new_token_ids)
|
|
elif request.pooling_params and pooler_output is not None:
|
|
# Pooling stops as soon as there is output.
|
|
request.status = RequestStatus.FINISHED_STOPPED
|
|
stopped = True
|
|
|
|
routed_experts = None
|
|
if stopped:
|
|
if self.vllm_config.model_config.enable_return_routed_experts:
|
|
kv_blocks = self.kv_cache_manager.get_blocks(request.request_id)
|
|
block_ids = kv_blocks.get_block_ids()[0]
|
|
num_tokens = request.num_tokens - 1
|
|
|
|
# compute slot mapping
|
|
block_ids_array = np.array(block_ids, dtype=np.int32)
|
|
num_blocks = len(block_ids)
|
|
block_size = self.block_size
|
|
|
|
# generate block offsets
|
|
block_offsets = np.arange(0, block_size)
|
|
|
|
# compute slot mapping: slot = block_id * block_size + offset
|
|
slot_mapping = (
|
|
block_offsets.reshape((1, block_size)) + block_ids_array.reshape((num_blocks, 1)) * block_size
|
|
).flatten()[:num_tokens]
|
|
|
|
routed_experts = self.routed_experts_reader.get_routed_experts(indices=slot_mapping)
|
|
kv_transfer_params = self._free_request(request)
|
|
if status_before_stop == RequestStatus.RUNNING:
|
|
stopped_running_reqs.add(request)
|
|
else:
|
|
stopped_preempted_reqs.add(request)
|
|
|
|
# Extract sample logprobs if needed.
|
|
if request.sampling_params is not None and request.sampling_params.logprobs is not None and logprobs:
|
|
new_logprobs = logprobs.slice_request(req_index, len(new_token_ids))
|
|
|
|
if new_token_ids and self.structured_output_manager.should_advance(request):
|
|
struct_output_request = request.structured_output_request
|
|
assert struct_output_request is not None
|
|
assert struct_output_request.grammar is not None
|
|
ok = struct_output_request.grammar.accept_tokens(req_id, new_token_ids)
|
|
if not ok:
|
|
logger.warning(
|
|
"Unexpected: grammar rejected tokens %s for request %s.",
|
|
new_token_ids,
|
|
req_id,
|
|
)
|
|
|
|
if num_nans_in_logits is not None and req_id in num_nans_in_logits:
|
|
request.num_nans_in_logits = num_nans_in_logits[req_id]
|
|
|
|
# Get prompt logprobs for this request.
|
|
prompt_logprobs_tensors = prompt_logprobs_dict.get(req_id)
|
|
if new_token_ids or pooler_output is not None or kv_transfer_params:
|
|
# Add EngineCoreOutput for this Request.
|
|
outputs[request.client_index].append(
|
|
EngineCoreOutput(
|
|
request_id=req_id,
|
|
new_token_ids=new_token_ids,
|
|
finish_reason=request.get_finished_reason(),
|
|
new_logprobs=new_logprobs,
|
|
new_prompt_logprobs_tensors=prompt_logprobs_tensors,
|
|
pooling_output=pooler_output,
|
|
stop_reason=request.stop_reason,
|
|
events=request.take_events(),
|
|
kv_transfer_params=kv_transfer_params,
|
|
trace_headers=request.trace_headers,
|
|
num_cached_tokens=request.num_cached_tokens,
|
|
routed_experts=routed_experts,
|
|
num_nans_in_logits=request.num_nans_in_logits,
|
|
)
|
|
)
|
|
else:
|
|
# Invariant: EngineCore returns no partial prefill outputs.
|
|
assert not prompt_logprobs_tensors
|
|
|
|
# Remove the stopped requests from the running and waiting queues.
|
|
if stopped_running_reqs:
|
|
self.running = remove_all(self.running, stopped_running_reqs)
|
|
if stopped_preempted_reqs:
|
|
# This is a rare case and unlikely to impact performance.
|
|
self.waiting.remove_requests(stopped_preempted_reqs)
|
|
|
|
if failed_kv_load_req_ids and not self.recompute_kv_load_failures:
|
|
requests = [self.requests[req_id] for req_id in failed_kv_load_req_ids]
|
|
self.finish_requests(failed_kv_load_req_ids, RequestStatus.FINISHED_ERROR)
|
|
for request in requests:
|
|
outputs[request.client_index].append(
|
|
EngineCoreOutput(
|
|
request_id=request.request_id,
|
|
new_token_ids=[],
|
|
finish_reason=request.get_finished_reason(),
|
|
events=request.take_events(),
|
|
trace_headers=request.trace_headers,
|
|
num_cached_tokens=request.num_cached_tokens,
|
|
)
|
|
)
|
|
|
|
# KV Connector: update state for finished KV Transfers.
|
|
if kv_connector_output:
|
|
self._update_from_kv_xfer_finished(kv_connector_output)
|
|
|
|
# collect KV cache events from KV cache manager
|
|
events = self.kv_cache_manager.take_events()
|
|
|
|
# collect KV cache events from connector
|
|
if self.connector is not None:
|
|
connector_events = self.connector.take_events()
|
|
if connector_events:
|
|
if events is None:
|
|
events = list(connector_events)
|
|
else:
|
|
events.extend(connector_events)
|
|
|
|
# publish collected KV cache events
|
|
if events:
|
|
batch = KVEventBatch(ts=time.time(), events=events)
|
|
self.kv_event_publisher.publish(batch)
|
|
|
|
# Create EngineCoreOutputs for all clients that have requests with
|
|
# outputs in this step.
|
|
engine_core_outputs = {client_index: EngineCoreOutputs(outputs=outs) for client_index, outs in outputs.items()}
|
|
|
|
finished_req_ids = self.finished_req_ids_dict
|
|
if finished_req_ids:
|
|
# Include ids of requests that finished since last outputs
|
|
# were sent.
|
|
for client_index, finished_set in finished_req_ids.items():
|
|
# Set finished request set in EngineCoreOutputs for this client.
|
|
if (eco := engine_core_outputs.get(client_index)) is not None:
|
|
eco.finished_requests = finished_set
|
|
else:
|
|
engine_core_outputs[client_index] = EngineCoreOutputs(finished_requests=finished_set)
|
|
finished_req_ids.clear()
|
|
|
|
if (stats := self.make_stats(spec_decoding_stats, kv_connector_stats, cudagraph_stats, perf_stats)) is not None:
|
|
# Return stats to only one of the front-ends.
|
|
if (eco := next(iter(engine_core_outputs.values()), None)) is None:
|
|
# We must return the stats even if there are no request
|
|
# outputs this step.
|
|
engine_core_outputs[0] = eco = EngineCoreOutputs()
|
|
eco.scheduler_stats = stats
|
|
|
|
return engine_core_outputs
|
|
|
|
|
|
class AsyncRecomputeScheduler(AsyncScheduler, RecomputeScheduler):
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|