From 641a4e60928c977667af775817570ebdbe582bea Mon Sep 17 00:00:00 2001 From: wangxiyuan Date: Wed, 2 Jul 2025 12:11:14 +0800 Subject: [PATCH] [CI] Cache sampled token ids in model runner to fix CI error (#1573) ### What this PR does / why we need it? vllm change https://github.com/vllm-project/vllm/commit/7f280d69c98e560427d2cbc9c3c3c13a83510dca break vllm-ascend. This PR Fix the broken CI ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? passed Closes: https://github.com/vllm-project/vllm-ascend/issues/1572 Signed-off-by: wangxiyuan --- tests/e2e/singlecard/test_ilama_lora.py | 1 - vllm_ascend/worker/model_runner_v1.py | 85 +++++++++++++++++-------- 2 files changed, 57 insertions(+), 29 deletions(-) diff --git a/tests/e2e/singlecard/test_ilama_lora.py b/tests/e2e/singlecard/test_ilama_lora.py index 2d93bce..3276fa5 100644 --- a/tests/e2e/singlecard/test_ilama_lora.py +++ b/tests/e2e/singlecard/test_ilama_lora.py @@ -1,5 +1,4 @@ # SPDX-License-Identifier: Apache-2.0 - import vllm from vllm.lora.request import LoRARequest diff --git a/vllm_ascend/worker/model_runner_v1.py b/vllm_ascend/worker/model_runner_v1.py index bc89a61..9cbb2c3 100644 --- a/vllm_ascend/worker/model_runner_v1.py +++ b/vllm_ascend/worker/model_runner_v1.py @@ -527,24 +527,27 @@ class NPUModelRunner(LoRAModelRunnerMixin): self.input_batch.num_tokens[req_index] = end_token_index else: req_data = scheduler_output.scheduled_cached_reqs + is_last_rank = get_pp_group().is_last_rank for i, req_id in enumerate(req_data.req_ids): req_state = self.requests[req_id] num_computed_tokens = req_data.num_computed_tokens[i] - new_token_ids = req_data.new_token_ids[i] new_block_ids = req_data.new_block_ids[i] resumed_from_preemption = req_data.resumed_from_preemption[i] req_state.num_computed_tokens = num_computed_tokens - # Add the sampled token(s) from the previous step (if any). - # This doesn't include "unverified" tokens like spec decode tokens. - num_new_tokens = (num_computed_tokens + len(new_token_ids) - - req_state.num_tokens) - if num_new_tokens == 1: - # Avoid slicing list in most common case. - req_state.output_token_ids.append(new_token_ids[-1]) - elif num_new_tokens > 0: - req_state.output_token_ids.extend( - new_token_ids[-num_new_tokens:]) + if not is_last_rank: + new_token_ids = req_data.new_token_ids[i] + # Add the sampled token(s) from the previous step (if any). + # This doesn't include "unverified" tokens like spec decode tokens. + num_new_tokens = (num_computed_tokens + + len(new_token_ids) - + req_state.num_tokens) + if num_new_tokens == 1: + # Avoid slicing list in most common case. + req_state.output_token_ids.append(new_token_ids[-1]) + elif num_new_tokens > 0: + req_state.output_token_ids.extend( + new_token_ids[-num_new_tokens:]) # Update the block IDs. if not resumed_from_preemption: # Append the new blocks to the existing block IDs. @@ -570,25 +573,27 @@ class NPUModelRunner(LoRAModelRunnerMixin): self.input_batch.block_table.append_row( new_block_ids, req_index) - # Add new_token_ids to token_ids_cpu. - start_token_index = num_computed_tokens - end_token_index = num_computed_tokens + len(new_token_ids) - self.input_batch.token_ids_cpu[ - req_index, - start_token_index:end_token_index] = new_token_ids - self.input_batch.num_tokens_no_spec[ - req_index] = end_token_index - # Add spec_token_ids to token_ids_cpu. - spec_token_ids = scheduler_output.scheduled_spec_decode_tokens.get( - req_id, ()) - if spec_token_ids: - start_index = end_token_index - end_token_index += len(spec_token_ids) + + if not is_last_rank: + # Add new_token_ids to token_ids_cpu. + start_token_index = num_computed_tokens + end_token_index = num_computed_tokens + len(new_token_ids) self.input_batch.token_ids_cpu[ req_index, - start_index:end_token_index] = spec_token_ids - # NOTE(woosuk): `num_tokens` here may include spec decode tokens. - self.input_batch.num_tokens[req_index] = end_token_index + start_token_index:end_token_index] = new_token_ids + self.input_batch.num_tokens_no_spec[ + req_index] = end_token_index + # Add spec_token_ids to token_ids_cpu. + spec_token_ids = scheduler_output.scheduled_spec_decode_tokens.get( + req_id, ()) + if spec_token_ids: + start_index = end_token_index + end_token_index += len(spec_token_ids) + self.input_batch.token_ids_cpu[ + req_index, + start_index:end_token_index] = spec_token_ids + # NOTE(woosuk): `num_tokens` here may include spec decode tokens. + self.input_batch.num_tokens[req_index] = end_token_index # Check if the batch has changed. If not, we can skip copying the # sampling metadata from CPU to GPU. @@ -1641,6 +1646,30 @@ class NPUModelRunner(LoRAModelRunnerMixin): for i in discard_sampled_tokens_req_indices: valid_sampled_token_ids[i].clear() + if not vllm_version_is("0.9.1"): + # Cache the sampled tokens in the model runner, so that the schedulerAdd commentMore actions + # doesn't need to send them back. + # NOTE(woosuk): As an exception, when using PP, the scheduler sends + # the sampled tokens back, because there's no direct communication + # between the first-stage worker and the last-stage worker. + for req_idx, sampled_ids in enumerate(valid_sampled_token_ids): + if not sampled_ids: + continue + + start_idx = self.input_batch.num_tokens_no_spec[req_idx] + end_idx = start_idx + len(sampled_ids) + assert end_idx <= self.model_config.max_model_len, ( + "Sampled token IDs exceed the max model length. " + f"Total number of tokens: {end_idx} > max_model_len: " + f"{self.model_config.max_model_len}") + + self.input_batch.token_ids_cpu[ + req_idx, start_idx:end_idx] = sampled_ids + self.input_batch.num_tokens_no_spec[req_idx] = end_idx + self.input_batch.num_tokens[req_idx] = end_idx + req_id = self.input_batch.req_ids[req_idx] + req_state = self.requests[req_id] + req_state.output_token_ids.extend(sampled_ids) spec_token_ids = self._get_spec_token_ids( valid_sampled_token_ids,