## # Copyright (c) 2025 Huawei Technologies Co., Ltd. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # This file is a part of the vllm-ascend project. # Adapted from vllm-project/vllm/vllm/v1/core/sched/scheduler.py # from __future__ import annotations import time from collections import defaultdict, deque from dataclasses import dataclass, fields from vllm.config import SchedulerConfig, VllmConfig from vllm.distributed.ec_transfer.ec_connector.base import ECConnectorMetadata from vllm.distributed.kv_events import KVEventBatch from vllm.distributed.kv_transfer.kv_connector.v1.base import KVConnectorMetadata from vllm.distributed.kv_transfer.kv_connector.v1.metrics import KVConnectorStats from vllm.logger import init_logger from vllm.v1.core.kv_cache_manager import KVCacheBlocks from vllm.v1.core.sched.async_scheduler import AsyncScheduler from vllm.v1.core.sched.output import NewRequestData, SchedulerOutput from vllm.v1.core.sched.request_queue import SchedulingPolicy, create_request_queue from vllm.v1.core.sched.scheduler import Scheduler from vllm.v1.core.sched.utils import remove_all from vllm.v1.engine import EngineCoreEventType, EngineCoreOutput, EngineCoreOutputs, FinishReason from vllm.v1.metrics.perf import PerfStats from vllm.v1.outputs import ModelRunnerOutput from vllm.v1.request import Request, RequestStatus, StreamingUpdate from vllm.v1.sample.rejection_sampler import PLACEHOLDER_TOKEN_ID from vllm.v1.spec_decode.metrics import SpecDecodingStats from vllm.v1.utils import ConstantList, record_function_or_nullcontext logger = init_logger(__name__) @dataclass class RecomputeSchedulerConfig(SchedulerConfig): scheduler_cls: str | type[object] = "vllm_ascend.core.recompute_scheduler.RecomputeScheduler" @classmethod def initialize_from_config(cls, vllm_config: VllmConfig): vllm_scheduler_config = vllm_config.scheduler_config scheduler_config = { field.name: getattr(vllm_scheduler_config, field.name) for field in fields(vllm_scheduler_config) if field.init } if vllm_scheduler_config.async_scheduling: scheduler_config["scheduler_cls"] = "vllm_ascend.core.recompute_scheduler.AsyncRecomputeScheduler" else: scheduler_config["scheduler_cls"] = "vllm_ascend.core.recompute_scheduler.RecomputeScheduler" scheduler_config["max_model_len"] = vllm_config.model_config.max_model_len scheduler_config["is_encoder_decoder"] = vllm_config.model_config.is_encoder_decoder return cls(**scheduler_config) @dataclass class RecomputeReqInfo: request_id: str output_token_ids: ConstantList client_index: int = 0 @dataclass class RecomputeSchedulerOutput(SchedulerOutput): recomputed_reqs: list[RecomputeReqInfo] | None = None class RecomputeScheduler(Scheduler): running: list[Request] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # When is_mtp_kv_consumer is true, we will fill request.spec_token_ids # with placeholder tokens to enable full graph when decode nodes pull # the KV cache of one request from prefill nodes. self.is_mtp_kv_consumer = ( self.vllm_config.speculative_config and self.vllm_config.kv_transfer_config and self.vllm_config.kv_transfer_config.is_kv_consumer ) self.is_kv_producer = self.vllm_config.kv_transfer_config and self.vllm_config.kv_transfer_config.is_kv_producer self.is_hybrid_model = ( "qwen3_next" in self.vllm_config.model_config.hf_text_config.model_type or "qwen3_5" in self.vllm_config.model_config.hf_text_config.model_type ) def add_request(self, request: Request) -> None: existing = self.requests.get(request.request_id) if existing is not None: update = StreamingUpdate.from_request(request) if existing.status != RequestStatus.WAITING_FOR_STREAMING_REQ: assert existing.streaming_queue is not None, "duplicate request id" # Queue next input chunk (or finished sentinel). existing.streaming_queue.append(update) elif update is not None: # Commence next input chunk. self._update_request_as_session(existing, update) else: # Streaming-input session finished. self.finish_requests(request.request_id, RequestStatus.FINISHED_ABORTED) else: if request.resumable: request.streaming_queue = deque() # Fill in placeholder tokens to enable full graph compatibility. Without # placeholders, graph matching may fail, forcing eager mode execution. if self.is_kv_producer and self.is_hybrid_model and request.num_tokens > 1: request.prompt_token_ids.pop() request._all_token_ids.pop() request.num_prompt_tokens -= 1 if self.is_mtp_kv_consumer: request.spec_token_ids = [PLACEHOLDER_TOKEN_ID] * self.num_spec_tokens self.waiting.add_request(request) self.requests[request.request_id] = request if self.log_stats: request.record_event(EngineCoreEventType.QUEUED) def _update_waiting_for_remote_kv(self, request: Request) -> bool: """ KV Connector: check if the request_id is finished_recving. The finished_recving_kv_req_ids list is populated on the previous steps()'s update_from_output based on the worker side connector. When the kv transfer is ready, we cache the blocks and the request state will be moved back to WAITING from WAITING_FOR_REMOTE_KV. """ assert self.connector is not None if request.request_id not in self.finished_recving_kv_req_ids: return False if request.request_id in self.failed_recving_kv_req_ids: # Request had KV load failures; num_computed_tokens was already # updated in _update_requests_with_invalid_blocks if request.num_computed_tokens: # Cache any valid computed tokens. self.kv_cache_manager.cache_blocks(request, request.num_computed_tokens) else: # No valid computed tokens, release allocated blocks. # There may be a local cache hit on retry. self.kv_cache_manager.free(request) self.failed_recving_kv_req_ids.remove(request.request_id) else: # Now that the blocks are ready, actually cache them. block_ids = self.kv_cache_manager.get_block_ids(request.request_id) if len(block_ids) == 1: num_computed_tokens = len(block_ids[0]) * self.block_size # Handle the case where num request tokens less than one block. num_computed_tokens = min(num_computed_tokens, request.num_tokens) else: num_computed_tokens = request.num_tokens if num_computed_tokens == request.num_tokens: num_computed_tokens -= 1 # This will cache the blocks iff caching is enabled. self.kv_cache_manager.cache_blocks(request, num_computed_tokens) # Update the request state for scheduling. request.num_computed_tokens = num_computed_tokens # Return that we are ready. self.finished_recving_kv_req_ids.remove(request.request_id) return True def schedule(self) -> RecomputeSchedulerOutput: # NOTE(woosuk) on the scheduling algorithm: # There's no "decoding phase" nor "prefill phase" in the scheduler. # Each request just has the num_computed_tokens and # num_tokens_with_spec. num_tokens_with_spec = # len(prompt_token_ids) + len(output_token_ids) + len(spec_token_ids). # At each step, the scheduler tries to assign tokens to the requests # so that each request's num_computed_tokens can catch up its # num_tokens_with_spec. This is general enough to cover # chunked prefills, prefix caching, speculative decoding, # and the "jump decoding" optimization in the future. scheduled_new_reqs: list[Request] = [] scheduled_resumed_reqs: list[Request] = [] scheduled_running_reqs: list[Request] = [] preempted_reqs: list[Request] = [] recomputed_reqs: list[RecomputeReqInfo] = [] req_to_new_blocks: dict[str, KVCacheBlocks] = {} num_scheduled_tokens: dict[str, int] = {} token_budget = self.max_num_scheduled_tokens # Encoder-related. scheduled_encoder_inputs: dict[str, list[int]] = {} encoder_compute_budget = self.max_num_encoder_input_tokens # Spec decode-related. scheduled_spec_decode_tokens: dict[str, list[int]] = {} # For logging. scheduled_timestamp = time.monotonic() # First, schedule the RUNNING requests. req_index = 0 while req_index < len(self.running) and token_budget > 0: request = self.running[req_index] if ( request.num_output_placeholders > 0 # This is (num_computed_tokens + 1) - (num_output_placeholders - 1). # Since output placeholders are also included in the computed tokens # count, we subtract (num_output_placeholders - 1) to remove any draft # tokens, so that we can be sure no further steps are needed even if # they are all rejected. and request.num_computed_tokens + 2 - request.num_output_placeholders >= request.num_prompt_tokens + request.max_tokens ): # Async scheduling: Avoid scheduling an extra step when we are sure that # the previous step has reached request.max_tokens. We don't schedule # partial draft tokens since this prevents uniform decode optimizations. req_index += 1 continue num_new_tokens = ( request.num_tokens_with_spec + request.num_output_placeholders - request.num_computed_tokens ) if 0 < self.scheduler_config.long_prefill_token_threshold < num_new_tokens: num_new_tokens = self.scheduler_config.long_prefill_token_threshold num_new_tokens = min(num_new_tokens, token_budget) # Make sure the input position does not exceed the max model len. # This is necessary when using spec decoding. num_new_tokens = min(num_new_tokens, self.max_model_len - 1 - request.num_computed_tokens) # Schedule encoder inputs. encoder_inputs_to_schedule = None external_load_encoder_input: list[int] = [] new_encoder_compute_budget = encoder_compute_budget if request.has_encoder_inputs: ( encoder_inputs_to_schedule, num_new_tokens, new_encoder_compute_budget, external_load_encoder_input, ) = self._try_schedule_encoder_inputs( request, request.num_computed_tokens, num_new_tokens, encoder_compute_budget, shift_computed_tokens=1 if self.use_eagle else 0, ) if self.need_mamba_block_aligned_split: num_new_tokens = self._mamba_block_aligned_split(request, num_new_tokens) if num_new_tokens == 0: # The request cannot be scheduled because one of the following # reasons: # 1. No new tokens to schedule. This may happen when # (1) PP>1 and we have already scheduled all prompt tokens # but they are not finished yet. # (2) Async scheduling and the request has reached to either # its max_total_tokens or max_model_len. # 2. The encoder budget is exhausted. # 3. The encoder cache is exhausted. # 4. Insufficient budget for a block-aligned chunk in hybrid # models with mamba cache mode \"align\". # NOTE(woosuk): Here, by doing `continue` instead of `break`, # we do not strictly follow the FCFS scheduling policy and # allow the lower-priority requests to be scheduled. req_index += 1 continue # Schedule newly needed KV blocks for the request. with record_function_or_nullcontext("schedule: allocate_slots"): while True: new_blocks = self.kv_cache_manager.allocate_slots( request, num_new_tokens, num_lookahead_tokens=self.num_lookahead_tokens, ) if new_blocks is not None: # The request can be scheduled. break # The request cannot be scheduled. # Preempt the lowest-priority request. # NOTE: We add the preempted_req to recomputed_reqs in kv_consumer to # drop the request to PD proxy. transfer_config = self.vllm_config.kv_transfer_config if transfer_config is not None and not transfer_config.is_kv_producer: recomputed_req = self.running.pop() self.kv_cache_manager.free(recomputed_req) recomputed_reqs.append( RecomputeReqInfo( recomputed_req.request_id, recomputed_req.output_token_ids, recomputed_req.client_index ) ) if recomputed_req == request: break else: if self.policy == SchedulingPolicy.PRIORITY: preempted_req = max( self.running, key=lambda r: (r.priority, r.arrival_time), ) self.running.remove(preempted_req) if preempted_req in scheduled_running_reqs: preempted_req_id = preempted_req.request_id scheduled_running_reqs.remove(preempted_req) token_budget += num_scheduled_tokens.pop(preempted_req_id) req_to_new_blocks.pop(preempted_req_id) scheduled_spec_decode_tokens.pop(preempted_req_id, None) preempted_encoder_inputs = scheduled_encoder_inputs.pop(preempted_req_id, None) if preempted_encoder_inputs: # Restore encoder compute budget if the preempted # request had encoder inputs scheduled in this step. num_embeds_to_restore = sum( preempted_req.get_num_encoder_embeds(i) for i in preempted_encoder_inputs ) encoder_compute_budget += num_embeds_to_restore req_index -= 1 else: preempted_req = self.running.pop() self._preempt_request(preempted_req, scheduled_timestamp) preempted_reqs.append(preempted_req) if preempted_req == request: # No more request to preempt. Cannot schedule this request. break if new_blocks is None: # Cannot schedule this request. break # Schedule the request. scheduled_running_reqs.append(request) request_id = request.request_id req_to_new_blocks[request_id] = new_blocks num_scheduled_tokens[request_id] = num_new_tokens token_budget -= num_new_tokens req_index += 1 # Speculative decode related. if request.spec_token_ids: num_scheduled_spec_tokens = ( num_new_tokens + request.num_computed_tokens - request.num_tokens - request.num_output_placeholders ) if num_scheduled_spec_tokens > 0: spec_token_ids = request.spec_token_ids if len(spec_token_ids) > num_scheduled_spec_tokens: spec_token_ids = spec_token_ids[:num_scheduled_spec_tokens] scheduled_spec_decode_tokens[request.request_id] = spec_token_ids # New spec tokens will be set in `update_draft_token_ids` before the # next step when applicable. request.spec_token_ids = [] # Encoder-related. if encoder_inputs_to_schedule: scheduled_encoder_inputs[request_id] = encoder_inputs_to_schedule # Allocate the encoder cache. for i in encoder_inputs_to_schedule: self.encoder_cache_manager.allocate(request, i) encoder_compute_budget = new_encoder_compute_budget if external_load_encoder_input: for i in external_load_encoder_input: self.encoder_cache_manager.allocate(request, i) if self.ec_connector is not None: self.ec_connector.update_state_after_alloc(request, i) # Record the LoRAs in scheduled_running_reqs scheduled_loras: set[int] = set() if self.lora_config: scheduled_loras = set( req.lora_request.lora_int_id for req in scheduled_running_reqs if req.lora_request and req.lora_request.lora_int_id > 0 ) assert len(scheduled_loras) <= self.lora_config.max_loras # Use a temporary RequestQueue to collect requests that need to be # skipped and put back at the head of the waiting queue later skipped_waiting_requests = create_request_queue(self.policy) # Next, schedule the WAITING requests. if not preempted_reqs and not recomputed_reqs: while self.waiting and token_budget > 0: if len(self.running) == self.max_num_running_reqs: break request = self.waiting.peek_request() request_id = request.request_id # KVTransfer: skip request if still waiting for remote kvs. if request.status == RequestStatus.WAITING_FOR_REMOTE_KVS: is_ready = self._update_waiting_for_remote_kv(request) if is_ready: if request.num_preemptions: # We must be loading for a resumed preemption # rather than a new request. request.status = RequestStatus.PREEMPTED else: request.status = RequestStatus.WAITING else: logger.debug( "%s is still in WAITING_FOR_REMOTE_KVS state.", request_id, ) self.waiting.pop_request() skipped_waiting_requests.prepend_request(request) continue # Skip request if the structured output request is still waiting # for FSM compilation. if request.status == RequestStatus.WAITING_FOR_FSM: structured_output_req = request.structured_output_request if structured_output_req and structured_output_req.grammar: request.status = RequestStatus.WAITING else: self.waiting.pop_request() skipped_waiting_requests.prepend_request(request) continue # Streaming: skip request if still waiting for next streaming req. if request.status == RequestStatus.WAITING_FOR_STREAMING_REQ: assert not request.streaming_queue self.waiting.pop_request() skipped_waiting_requests.prepend_request(request) continue # Check that adding the request still respects the max_loras # constraint. if ( self.lora_config and request.lora_request and ( len(scheduled_loras) == self.lora_config.max_loras and request.lora_request.lora_int_id not in scheduled_loras ) ): # Scheduling would exceed max_loras, skip. self.waiting.pop_request() skipped_waiting_requests.prepend_request(request) continue num_external_computed_tokens = 0 load_kv_async = False connector_prefix_cache_queries, connector_prefix_cache_hits = 0, 0 # Get already-cached tokens. if request.num_computed_tokens == 0: # Get locally-cached tokens. new_computed_blocks, num_new_local_computed_tokens = self.kv_cache_manager.get_computed_blocks( request ) # Get externally-cached tokens if using a KVConnector. if self.connector is not None: ext_tokens, load_kv_async = self.connector.get_num_new_matched_tokens( request, num_new_local_computed_tokens ) if ext_tokens is None: # The request cannot be scheduled because # the KVConnector couldn't determine # the number of matched tokens. self.waiting.pop_request() skipped_waiting_requests.prepend_request(request) continue request.num_external_computed_tokens = ext_tokens num_external_computed_tokens = ext_tokens connector_prefix_cache_queries = request.num_tokens - num_new_local_computed_tokens connector_prefix_cache_hits = num_external_computed_tokens # Total computed tokens (local + external). num_computed_tokens = num_new_local_computed_tokens + num_external_computed_tokens else: # KVTransfer: WAITING reqs have num_computed_tokens > 0 # after async KV recvs are completed. new_computed_blocks = self.kv_cache_manager.empty_kv_cache_blocks num_new_local_computed_tokens = 0 num_computed_tokens = request.num_computed_tokens encoder_inputs_to_schedule = None external_load_encoder_input = [] new_encoder_compute_budget = encoder_compute_budget if load_kv_async: # KVTransfer: loading remote KV, do not allocate for new work. assert num_external_computed_tokens > 0 num_new_tokens = 0 else: # Number of tokens to be scheduled. # We use `request.num_tokens` instead of # `request.num_prompt_tokens` to consider the resumed # requests, which have output tokens. if self.is_mtp_kv_consumer: num_new_tokens = request.num_tokens_with_spec - num_computed_tokens else: num_new_tokens = request.num_tokens - num_computed_tokens threshold = self.scheduler_config.long_prefill_token_threshold if 0 < threshold < num_new_tokens: num_new_tokens = threshold # chunked prefill has to be enabled explicitly to allow # pooling requests to be chunked if not self.scheduler_config.enable_chunked_prefill and num_new_tokens > token_budget: # If chunked_prefill is disabled, # we can stop the scheduling here. break num_new_tokens = min(num_new_tokens, token_budget) assert num_new_tokens > 0 # Schedule encoder inputs. if request.has_encoder_inputs: ( encoder_inputs_to_schedule, num_new_tokens, new_encoder_compute_budget, external_load_encoder_input, ) = self._try_schedule_encoder_inputs( request, num_computed_tokens, num_new_tokens, encoder_compute_budget, shift_computed_tokens=1 if self.use_eagle else 0, ) if num_new_tokens == 0: # The request cannot be scheduled. break if self.need_mamba_block_aligned_split: num_new_tokens = self._mamba_block_aligned_split( request, num_new_tokens, num_new_local_computed_tokens, num_external_computed_tokens, ) if num_new_tokens == 0: break # Handles an edge case when P/D Disaggregation # is used with Spec Decoding where an # extra block gets allocated which # creates a mismatch between the number # of local and remote blocks. effective_lookahead_tokens = 0 if request.num_computed_tokens == 0 else self.num_lookahead_tokens num_encoder_tokens = ( self._num_encoder_max_input_tokens if self.is_encoder_decoder and request.has_encoder_inputs else 0 ) new_blocks = self.kv_cache_manager.allocate_slots( request, num_new_tokens, num_new_computed_tokens=num_new_local_computed_tokens, new_computed_blocks=new_computed_blocks, num_lookahead_tokens=effective_lookahead_tokens, num_external_computed_tokens=num_external_computed_tokens, delay_cache_blocks=load_kv_async, num_encoder_tokens=num_encoder_tokens, ) if new_blocks is None: # The request cannot be scheduled. # NOTE: we need to untouch the request from the encode cache # manager if request.has_encoder_inputs: self.encoder_cache_manager.free(request) break # KVTransfer: the connector uses this info to determine # if a load is needed. Note that # This information is used to determine if a load is # needed for this request. if self.connector is not None: self.connector.update_state_after_alloc( request, self.kv_cache_manager.get_blocks(request_id), num_external_computed_tokens, ) if self.connector_prefix_cache_stats is not None and connector_prefix_cache_queries != 0: self.connector_prefix_cache_stats.record( num_tokens=connector_prefix_cache_queries, num_hits=connector_prefix_cache_hits, preempted=request.num_preemptions > 0, ) # Request was already popped from self.waiting # unless it was re-added above due to new_blocks being None. request = self.waiting.pop_request() if load_kv_async: # If loading async, allocate memory and put request # into the WAITING_FOR_REMOTE_KV state. skipped_waiting_requests.prepend_request(request) request.status = RequestStatus.WAITING_FOR_REMOTE_KVS continue # For spec_token_ids, the waiting queue has the same processing # as the running queue. if self.is_mtp_kv_consumer and request.spec_token_ids: num_scheduled_spec_tokens = ( num_new_tokens + request.num_computed_tokens - request.num_tokens - request.num_output_placeholders ) if num_scheduled_spec_tokens > 0: spec_token_ids = request.spec_token_ids if len(spec_token_ids) > num_scheduled_spec_tokens: spec_token_ids = spec_token_ids[:num_scheduled_spec_tokens] scheduled_spec_decode_tokens[request.request_id] = spec_token_ids # New spec tokens will be set in `update_draft_token_ids` before the # next step when applicable. request.spec_token_ids = [] self.running.append(request) if self.log_stats: request.record_event(EngineCoreEventType.SCHEDULED, scheduled_timestamp) if request.status == RequestStatus.WAITING: scheduled_new_reqs.append(request) elif request.status == RequestStatus.PREEMPTED: scheduled_resumed_reqs.append(request) else: raise RuntimeError(f"Invalid request status: {request.status}") if self.lora_config and request.lora_request: scheduled_loras.add(request.lora_request.lora_int_id) req_to_new_blocks[request_id] = self.kv_cache_manager.get_blocks(request_id) num_scheduled_tokens[request_id] = num_new_tokens token_budget -= num_new_tokens request.status = RequestStatus.RUNNING request.num_computed_tokens = num_computed_tokens # Count the number of prefix cached tokens. if request.num_cached_tokens < 0: request.num_cached_tokens = num_computed_tokens # Encoder-related. if encoder_inputs_to_schedule: scheduled_encoder_inputs[request_id] = encoder_inputs_to_schedule # Allocate the encoder cache. for i in encoder_inputs_to_schedule: self.encoder_cache_manager.allocate(request, i) encoder_compute_budget = new_encoder_compute_budget # Allocate for external load encoder cache if external_load_encoder_input: for i in external_load_encoder_input: self.encoder_cache_manager.allocate(request, i) if self.ec_connector is not None: self.ec_connector.update_state_after_alloc(request, i) # Put back any skipped requests at the head of the waiting queue if skipped_waiting_requests: self.waiting.prepend_requests(skipped_waiting_requests) # Check if the scheduling constraints are satisfied. total_num_scheduled_tokens = sum(num_scheduled_tokens.values()) assert total_num_scheduled_tokens <= self.max_num_scheduled_tokens assert token_budget >= 0 assert len(self.running) <= self.max_num_running_reqs # Since some requests in the RUNNING queue may not be scheduled in # this step, the total number of scheduled requests can be smaller than # len(self.running). assert len(scheduled_new_reqs) + len(scheduled_resumed_reqs) + len(scheduled_running_reqs) <= len(self.running) # Get the longest common prefix among all requests in the running queue. # This can be potentially used for cascade attention. num_common_prefix_blocks = [0] * len(self.kv_cache_config.kv_cache_groups) with record_function_or_nullcontext("schedule: get_num_common_prefix_blocks"): if self.running: any_request_id = self.running[0].request_id num_common_prefix_blocks = self.kv_cache_manager.get_num_common_prefix_blocks(any_request_id) # Construct the scheduler output. if self.use_v2_model_runner: scheduled_new_reqs = scheduled_new_reqs + scheduled_resumed_reqs scheduled_resumed_reqs = [] new_reqs_data = [ NewRequestData.from_request( req, req_to_new_blocks[req.request_id].get_block_ids(), req._all_token_ids, ) for req in scheduled_new_reqs ] else: new_reqs_data = [ NewRequestData.from_request(req, req_to_new_blocks[req.request_id].get_block_ids()) for req in scheduled_new_reqs ] with record_function_or_nullcontext("schedule: make_cached_request_data"): cached_reqs_data = self._make_cached_request_data( scheduled_running_reqs, scheduled_resumed_reqs, num_scheduled_tokens, scheduled_spec_decode_tokens, req_to_new_blocks, ) # Record the request ids that were scheduled in this step. self.prev_step_scheduled_req_ids.clear() self.prev_step_scheduled_req_ids.update(num_scheduled_tokens.keys()) scheduler_output = RecomputeSchedulerOutput( scheduled_new_reqs=new_reqs_data, scheduled_cached_reqs=cached_reqs_data, num_scheduled_tokens=num_scheduled_tokens, total_num_scheduled_tokens=total_num_scheduled_tokens, scheduled_spec_decode_tokens=scheduled_spec_decode_tokens, scheduled_encoder_inputs=scheduled_encoder_inputs, num_common_prefix_blocks=num_common_prefix_blocks, preempted_req_ids={req.request_id for req in preempted_reqs}, # finished_req_ids is an existing state in the scheduler, # instead of being newly scheduled in this step. # It contains the request IDs that are finished in between # the previous and the current steps. finished_req_ids=self.finished_req_ids, free_encoder_mm_hashes=self.encoder_cache_manager.get_freed_mm_hashes(), recomputed_reqs=recomputed_reqs, ) # NOTE(Kuntai): this function is designed for multiple purposes: # 1. Plan the KV cache store # 2. Wrap up all the KV cache load / save ops into an opaque object # 3. Clear the internal states of the connector if self.connector is not None: meta: KVConnectorMetadata = self.connector.build_connector_meta(scheduler_output) scheduler_output.kv_connector_metadata = meta # Build the connector meta for ECConnector if self.ec_connector is not None: ec_meta: ECConnectorMetadata = self.ec_connector.build_connector_meta(scheduler_output) scheduler_output.ec_connector_metadata = ec_meta with record_function_or_nullcontext("schedule: update_after_schedule"): self._update_after_schedule(scheduler_output) return scheduler_output def update_from_output( self, scheduler_output: SchedulerOutput, model_runner_output: ModelRunnerOutput, ) -> dict[int, EngineCoreOutputs]: sampled_token_ids = model_runner_output.sampled_token_ids logprobs = model_runner_output.logprobs prompt_logprobs_dict = model_runner_output.prompt_logprobs_dict num_scheduled_tokens = scheduler_output.num_scheduled_tokens pooler_outputs = model_runner_output.pooler_output num_nans_in_logits = model_runner_output.num_nans_in_logits kv_connector_output = model_runner_output.kv_connector_output cudagraph_stats = model_runner_output.cudagraph_stats perf_stats: PerfStats | None = None if self.perf_metrics and self.perf_metrics.is_enabled(): perf_stats = self.perf_metrics.get_step_perf_stats_per_gpu(scheduler_output) outputs: dict[int, list[EngineCoreOutput]] = defaultdict(list) spec_decoding_stats: SpecDecodingStats | None = None kv_connector_stats: KVConnectorStats | None = ( kv_connector_output.kv_connector_stats if kv_connector_output else None ) if kv_connector_stats and self.connector: kv_stats = self.connector.get_kv_connector_stats() if kv_stats: kv_connector_stats = kv_connector_stats.aggregate(kv_stats) failed_kv_load_req_ids = None if kv_connector_output and kv_connector_output.invalid_block_ids: # These blocks contain externally computed tokens that failed to # load. Identify affected requests and adjust their computed token # count to trigger recomputation of the invalid blocks. failed_kv_load_req_ids = self._handle_invalid_blocks(kv_connector_output.invalid_block_ids) # return recomputed requests as EngineCoreOutput if scheduler_output.recomputed_reqs is not None: for req_info in scheduler_output.recomputed_reqs: outputs[req_info.client_index].append( EngineCoreOutput( request_id=req_info.request_id, finish_reason=FinishReason.STOP, new_token_ids=[], stop_reason="recomputed", ) ) # NOTE(woosuk): As len(num_scheduled_tokens) can be up to 1K or more, # the below loop can be a performance bottleneck. We should do our best # to avoid expensive operations inside the loop. stopped_running_reqs: set[Request] = set() stopped_preempted_reqs: set[Request] = set() for req_id, num_tokens_scheduled in num_scheduled_tokens.items(): assert num_tokens_scheduled > 0 if failed_kv_load_req_ids and req_id in failed_kv_load_req_ids: # skip failed or rescheduled requests from KV load failure continue request = self.requests.get(req_id) if request is None or request.is_finished(): # The request is already finished. This can happen if the # request is aborted while the model is executing it (e.g., # in pipeline parallelism or in async scheduling). # NOTE(Kuntai): When delay_free_blocks=True (for async KV # cache transfer in KV connector), the aborted request will not # be set to None (in order to finish async KV transfer). # In this case, we use is_finished() to check. continue req_index = model_runner_output.req_id_to_index[req_id] generated_token_ids = sampled_token_ids[req_index] if sampled_token_ids else [] scheduled_spec_token_ids = scheduler_output.scheduled_spec_decode_tokens.get(req_id) if scheduled_spec_token_ids and generated_token_ids: num_draft_tokens = len(scheduled_spec_token_ids) num_accepted = len(generated_token_ids) - 1 num_rejected = num_draft_tokens - num_accepted # num_computed_tokens represents the number of tokens # processed in the current step, considering scheduled # tokens and rejections. If some tokens are rejected, # num_computed_tokens is decreased by the number of rejected # tokens. if request.num_computed_tokens > 0: request.num_computed_tokens -= num_rejected # If async scheduling, num_output_placeholders also includes # the scheduled spec tokens count and so is similarly adjusted. if request.num_output_placeholders > 0: request.num_output_placeholders -= num_rejected spec_decoding_stats = self.make_spec_decoding_stats( spec_decoding_stats, num_draft_tokens=num_draft_tokens, num_accepted_tokens=num_accepted, num_invalid_spec_tokens=scheduler_output.num_invalid_spec_tokens, request_id=req_id, ) stopped = False new_logprobs = None new_token_ids = generated_token_ids pooler_output = pooler_outputs[req_index] if pooler_outputs else None kv_transfer_params = None status_before_stop = request.status # Check for stop and update request status. if new_token_ids: new_token_ids, stopped = self._update_request_with_output(request, new_token_ids) elif request.pooling_params and pooler_output is not None: # Pooling stops as soon as there is output. request.status = RequestStatus.FINISHED_STOPPED stopped = True routed_experts = None finish_reason = None if stopped: routed_experts = self._get_routed_experts(request) # Capture finish_reason BEFORE _handle_stopped_request, which may # reset the status to WAITING for streaming requests that continue. finish_reason = request.get_finished_reason() finished = self._handle_stopped_request(request) if finished: kv_transfer_params = self._free_request(request) if status_before_stop == RequestStatus.RUNNING: stopped_running_reqs.add(request) else: stopped_preempted_reqs.add(request) # Extract sample logprobs if needed. if request.sampling_params is not None and request.sampling_params.logprobs is not None and logprobs: new_logprobs = logprobs.slice_request(req_index, len(new_token_ids)) if new_token_ids and self.structured_output_manager.should_advance(request): struct_output_request = request.structured_output_request assert struct_output_request is not None assert struct_output_request.grammar is not None ok = struct_output_request.grammar.accept_tokens(req_id, new_token_ids) if not ok: logger.warning( "Unexpected: grammar rejected tokens %s for request %s.", new_token_ids, req_id, ) if num_nans_in_logits is not None and req_id in num_nans_in_logits: request.num_nans_in_logits = num_nans_in_logits[req_id] # Get prompt logprobs for this request. prompt_logprobs_tensors = prompt_logprobs_dict.get(req_id) if new_token_ids or pooler_output is not None or kv_transfer_params or stopped: # Add EngineCoreOutput for this Request. outputs[request.client_index].append( EngineCoreOutput( request_id=req_id, new_token_ids=new_token_ids, finish_reason=finish_reason, new_logprobs=new_logprobs, new_prompt_logprobs_tensors=prompt_logprobs_tensors, pooling_output=pooler_output, stop_reason=request.stop_reason, events=request.take_events(), kv_transfer_params=kv_transfer_params, trace_headers=request.trace_headers, num_cached_tokens=request.num_cached_tokens, num_external_computed_tokens=request.num_external_computed_tokens, routed_experts=routed_experts, num_nans_in_logits=request.num_nans_in_logits, ) ) else: # Invariant: EngineCore returns no partial prefill outputs. assert not prompt_logprobs_tensors # Remove the stopped requests from the running and waiting queues. if stopped_running_reqs: self.running = remove_all(self.running, stopped_running_reqs) if stopped_preempted_reqs: # This is a rare case and unlikely to impact performance. self.waiting.remove_requests(stopped_preempted_reqs) if failed_kv_load_req_ids and not self.recompute_kv_load_failures: requests = [self.requests[req_id] for req_id in failed_kv_load_req_ids] self.finish_requests(failed_kv_load_req_ids, RequestStatus.FINISHED_ERROR) for request in requests: outputs[request.client_index].append( EngineCoreOutput( request_id=request.request_id, new_token_ids=[], finish_reason=request.get_finished_reason(), events=request.take_events(), trace_headers=request.trace_headers, num_cached_tokens=request.num_cached_tokens, ) ) # KV Connector: update state for finished KV Transfers. if kv_connector_output: self._update_from_kv_xfer_finished(kv_connector_output) # collect KV cache events from KV cache manager events = self.kv_cache_manager.take_events() # collect KV cache events from connector if self.connector is not None: connector_events = self.connector.take_events() if connector_events: if events is None: events = list(connector_events) else: events.extend(connector_events) # publish collected KV cache events if events: batch = KVEventBatch(ts=time.time(), events=events) self.kv_event_publisher.publish(batch) # Create EngineCoreOutputs for all clients that have requests with # outputs in this step. engine_core_outputs = {client_index: EngineCoreOutputs(outputs=outs) for client_index, outs in outputs.items()} finished_req_ids = self.finished_req_ids_dict if finished_req_ids: # Include ids of requests that finished since last outputs # were sent. for client_index, finished_set in finished_req_ids.items(): # Set finished request set in EngineCoreOutputs for this client. if (eco := engine_core_outputs.get(client_index)) is not None: eco.finished_requests = finished_set else: engine_core_outputs[client_index] = EngineCoreOutputs(finished_requests=finished_set) finished_req_ids.clear() if (stats := self.make_stats(spec_decoding_stats, kv_connector_stats, cudagraph_stats, perf_stats)) is not None: # Return stats to only one of the front-ends. if (eco := next(iter(engine_core_outputs.values()), None)) is None: # We must return the stats even if there are no request # outputs this step. engine_core_outputs[0] = eco = EngineCoreOutputs() eco.scheduler_stats = stats return engine_core_outputs class AsyncRecomputeScheduler(AsyncScheduler, RecomputeScheduler): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs)