From 1c8c23de584b8de7eba42e9e1e328fb1a5ecc501 Mon Sep 17 00:00:00 2001 From: lidenghui1110 <30521952+lidenghui1110@users.noreply.github.com> Date: Thu, 18 Dec 2025 15:27:55 +0800 Subject: [PATCH] [Bugfix] fix pipeline parallelism bug introduced by async-scheduling refactor work (#4973) ### What this PR does / why we need it? Currently, when using pipeline parallel and pd disaggregate, model_runner will return None on non-last-pp-rank stages in `sample_tokens`, which will cause assert error in vllm KVOutputAggregator on [this line](https://github.com/vllm-project/vllm/blob/main/vllm/distributed/kv_transfer/kv_connector/utils.py#L84). In fact, all pp workers should return a model_runner_output which contains kv_connector_output to do aggregate in Enginecore scheduler process to ensure all kv transfer is finished for kv cache releasing later. To fix this issue, this PR follows gpu_model_runner in vllm, passing kv_connector_output in `sample_tokens` to make sure all ranks will return a ModelRunnerOutput, in non-last-pp-rank workers, it will return EMPTY_MODEL_RUNNER_OUTPUT with kv_connector_output. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? - vLLM version: v0.12.0 - vLLM main: https://github.com/vllm-project/vllm/commit/ad32e3e19ccf0526cb6744a5fed09a138a5fb2f9 --------- Signed-off-by: lidenghui --- vllm_ascend/worker/model_runner_v1.py | 24 ++++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/vllm_ascend/worker/model_runner_v1.py b/vllm_ascend/worker/model_runner_v1.py index 830270ea..4ab4f06d 100644 --- a/vllm_ascend/worker/model_runner_v1.py +++ b/vllm_ascend/worker/model_runner_v1.py @@ -21,7 +21,7 @@ import math import time from collections import defaultdict from contextlib import contextmanager, nullcontext -from copy import deepcopy +from copy import copy, deepcopy from dataclasses import dataclass from multiprocessing import Manager from typing import TYPE_CHECKING, Any, Dict, List, NamedTuple, Optional, Union @@ -189,7 +189,6 @@ class ExecuteModelState(NamedTuple): hidden_states: torch.Tensor sample_hidden_states: torch.Tensor aux_hidden_states: list[torch.Tensor] | None - kv_connector_output: KVConnectorOutput | None attn_metadata: dict[str, Any] positions: torch.Tensor @@ -1450,6 +1449,7 @@ class NPUModelRunner(GPUModelRunner): # For mid-pipeline stages, return the hidden states. if not broadcast_pp_output: hidden_states.kv_connector_output = kv_connector_output + self.kv_connector_output = kv_connector_output if need_dump: assert self.debugger is not None self.debugger.stop() @@ -1496,19 +1496,32 @@ class NPUModelRunner(GPUModelRunner): hidden_states, sample_hidden_states, aux_hidden_states, - kv_connector_output, attn_metadata, positions, ) + self.kv_connector_output = kv_connector_output return None @torch.inference_mode def sample_tokens( self, grammar_output: "GrammarOutput | None" ) -> ModelRunnerOutput | AsyncModelRunnerOutput | IntermediateTensors: + kv_connector_output = self.kv_connector_output + self.kv_connector_output = None + if self.execute_model_state is None: # Nothing to do (PP non-final rank case), output isn't used. - return None # noqa + if not kv_connector_output: + return None # noqa + # In case of PP with kv transfer, we need to pass through the + # kv_connector_output + if kv_connector_output.is_empty(): + return EMPTY_MODEL_RUNNER_OUTPUT + + output = copy(EMPTY_MODEL_RUNNER_OUTPUT) + output.kv_connector_output = kv_connector_output + return output + need_dump = self.dump_enable and self.debugger is not None # Unpack ephemeral state. ( @@ -1517,8 +1530,7 @@ class NPUModelRunner(GPUModelRunner): spec_decode_metadata, hidden_states, sample_hidden_states, - aux_hidden_states, # noqa - kv_connector_output, + aux_hidden_states, attn_metadata, positions, ) = self.execute_model_state