From 65289676b42ee53c062bbf418998401fdf46be1b Mon Sep 17 00:00:00 2001 From: Canlin Guo Date: Mon, 26 Jan 2026 14:05:23 +0800 Subject: [PATCH] [Refactor] Separate `_prepare_inputs` to `_prepare_inputs` and `_preprocess` (#6191) ### What this PR does / why we need it? Align with upstream vLLM. This PR will help downstream vLLM-Omni reduce the cost for maintaining the _prepare_inputs. Besides, it helps vLLM-Ascend code more readable. In the future, we can follow closer to vLLM. The `preprocess` logic is same as GPUModelRunner. We don't need to maintain it anymore. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? CI. - vLLM version: v0.14.0 - vLLM main: https://github.com/vllm-project/vllm/commit/d68209402ddab3f54a09bc1f4de9a9495a283b60 --------- Signed-off-by: gcanlin --- vllm_ascend/worker/model_runner_v1.py | 162 +++++--------------------- 1 file changed, 27 insertions(+), 135 deletions(-) diff --git a/vllm_ascend/worker/model_runner_v1.py b/vllm_ascend/worker/model_runner_v1.py index 59c432c9..6d823721 100644 --- a/vllm_ascend/worker/model_runner_v1.py +++ b/vllm_ascend/worker/model_runner_v1.py @@ -62,7 +62,7 @@ from vllm.v1.kv_cache_interface import (AttentionSpec, MambaSpec, UniformTypeKVCacheSpecs) from vllm.v1.outputs import (EMPTY_MODEL_RUNNER_OUTPUT, AsyncModelRunnerOutput, LogprobsLists, LogprobsTensors, ModelRunnerOutput, - SamplerOutput, + SamplerOutput, ECConnectorOutput, make_empty_encoder_model_runner_output) from vllm.v1.sample.logits_processor import build_logitsprocs from vllm.v1.sample.metadata import SamplingMetadata @@ -181,6 +181,7 @@ class ExecuteModelState(NamedTuple): aux_hidden_states: list[torch.Tensor] | None attn_metadata: dict[str, Any] positions: torch.Tensor + ec_connector_output: ECConnectorOutput | None class NPUModelRunner(GPUModelRunner): @@ -497,10 +498,8 @@ class NPUModelRunner(GPUModelRunner): self, scheduler_output: "SchedulerOutput", intermediate_tensors: Optional[IntermediateTensors] = None, - ) -> tuple[dict[str, Any], torch.Tensor, np.ndarray, int, torch.Tensor, - int, torch.Tensor, SpecDecodeMetadata, Optional[torch.Tensor], - Optional[torch.Tensor], Optional[torch.Tensor], int, dict[str, - Any]]: + ) -> tuple[dict[str, Any], np.ndarray, int, Optional[torch.Tensor], + torch.Tensor, Optional[SpecDecodeMetadata], int]: total_num_scheduled_tokens = scheduler_output.total_num_scheduled_tokens assert total_num_scheduled_tokens > 0 num_reqs = self.input_batch.num_reqs @@ -604,15 +603,10 @@ class NPUModelRunner(GPUModelRunner): self.query_lens = torch.from_numpy(num_scheduled_tokens) # Get info across DP ranks. - # NOTE: maybe_padded_num_tokens is only used when using TorchAir with DP, - # Otherwise, it's just max_tokens_across_dp_cpu - (maybe_padded_num_tokens, num_tokens_across_dp, + (num_input_tokens, num_tokens_across_dp, with_prefill) = self._sync_metadata_across_dp(num_input_tokens, with_prefill) self.with_prefill = with_prefill - # TODO: Now that num_input_tokens is basically identical with maybe_padded_num_tokens - # We should consider removing maybe_padded_num_tokens later - num_input_tokens = maybe_padded_num_tokens # Hot-Swap lora model if self.lora_config: @@ -747,108 +741,6 @@ class NPUModelRunner(GPUModelRunner): discard_request_indices) self.discard_request_indices.copy_to_gpu(self.num_discarded_requests) - # _prepare_inputs may reorder the batch, so we must gather - # multi-modal outputs after that to ensure the correct order - model_kwargs = self._init_model_kwargs() - if self.is_multimodal_model and not self.model_config.is_encoder_decoder: - self.multimodal_cpu_fields = ["grid_thw"] - self._prepare_multimodal_fields() - with self.maybe_get_ec_connector_output( - scheduler_output, - encoder_cache=self.encoder_cache, - ): - # Run the multimodal encoder if any. - self._execute_mm_encoder(scheduler_output) - - # NOTE(woosuk): To unify token ids and soft tokens (vision - # embeddings), we always use embeddings (rather than token ids) - # as input to the multimodal model, even when the input is text. - input_ids = self.input_ids.gpu[:total_num_scheduled_tokens] - mm_embeds, is_mm_embed = self._gather_mm_embeddings( - scheduler_output) - - inputs_embeds = self.model.embed_input_ids( - input_ids, - multimodal_embeddings=mm_embeds, - is_multimodal=is_mm_embed, - ) - - # TODO(woosuk): Avoid the copy. Optimize. - self.inputs_embeds.gpu[:total_num_scheduled_tokens].copy_( - inputs_embeds) - inputs_embeds = self.inputs_embeds.gpu[:num_input_tokens] - input_ids = None - elif self.enable_prompt_embeds and get_pp_group().is_first_rank: - # Get the input embeddings for the tokens that are not input embeds, - # then put them into the appropriate positions. - # TODO(qthequartermasterman): Since even when prompt embeds are - # enabled, (a) not all requests will use prompt embeds, and (b) - # after the initial prompt is processed, the rest of the generated - # tokens will be token ids, it is not desirable to have the - # embedding layer outside of the acl graph all the time. The v0 - # engine avoids this by "double compiling" the acl graph, once - # with input_ids and again with inputs_embeds, for all num_tokens. - # If a batch only has token ids, then including the embedding layer - # in the acl graph will be more performant (like in the else case - # below). - token_ids_idx = self.is_token_ids.gpu[:total_num_scheduled_tokens] \ - .nonzero(as_tuple=False) \ - .squeeze(1) - # Some tokens ids may need to become embeds - if token_ids_idx.numel() > 0: - token_ids = self.input_ids.gpu[token_ids_idx] - tokens_to_embeds = self.model.embed_input_ids( - input_ids=token_ids) - self.inputs_embeds.gpu[token_ids_idx] = tokens_to_embeds - - inputs_embeds = self.inputs_embeds.gpu[:num_input_tokens] - input_ids = None - else: - # For text-only models, we use token ids as input. - # While it is possible to use embeddings as input just like the - # multimodal models, it is not desirable for performance since - # then the embedding layer is not included in the ACL graph. - input_ids = self.input_ids.gpu[:num_input_tokens] - inputs_embeds = None - if self.uses_mrope: - positions = self.mrope_positions.gpu[:, :num_input_tokens] - elif self.uses_xdrope_dim > 0: - positions = self.xdrope_positions.gpu[:, :num_input_tokens] - else: - positions = self.positions.gpu[:num_input_tokens] - - # Run the encoder, just like we do with other multimodal inputs. - if self.model_config.is_encoder_decoder and scheduler_output.scheduled_encoder_inputs: - input_ids = self.input_ids.gpu[:total_num_scheduled_tokens] - positions = self.positions.gpu[:total_num_scheduled_tokens] - encoder_outputs = self._execute_mm_encoder(scheduler_output) - model_kwargs.update({"encoder_outputs": encoder_outputs}) - - # type: ignore - if get_pp_group().is_first_rank: - intermediate_tensors = None - else: - assert intermediate_tensors is not None - assert self.intermediate_tensors is not None - # If both flashcomm1 and pp are used simultaneously, - # the shape of the received data and the shape of the space to be copied to will not match, - # requiring a recalculation of the incoming data's shape. - tp_size = get_tensor_model_parallel_world_size() - num_input_tokens_with_flashcomm1 = num_input_tokens - if enable_sp(): - num_input_tokens_with_flashcomm1 = (num_input_tokens + - tp_size - 1) // tp_size - for k, v in intermediate_tensors.items(): - self.intermediate_tensors[ - k][:num_input_tokens_with_flashcomm1].copy_( - v[:num_input_tokens_with_flashcomm1], - non_blocking=True) - intermediate_tensors = IntermediateTensors({ - k: - v[:num_input_tokens_with_flashcomm1] - for k, v in self.intermediate_tensors.items() - }) - use_spec_decode = len( scheduler_output.scheduled_spec_decode_tokens) > 0 if not use_spec_decode: @@ -1082,20 +974,15 @@ class NPUModelRunner(GPUModelRunner): for layer_name in attn_group.layer_names: attn_metadata[layer_name] = attn_metadata_i - # update global cos, sin - update_cos_sin(positions) - if lmhead_tp_enable(): max_num_reqs_across_dp = self.max_num_reqs * self.uniform_decode_query_len logits_indices = nn.functional.pad( logits_indices, (0, max_num_reqs_across_dp - logits_indices.shape[0])) - return (attn_metadata, positions, num_scheduled_tokens, - num_input_tokens, num_tokens_across_dp, - maybe_padded_num_tokens, logits_indices, spec_decode_metadata, - input_ids, inputs_embeds, intermediate_tensors, - max_num_scheduled_tokens, model_kwargs) + return (attn_metadata, num_scheduled_tokens, num_input_tokens, + num_tokens_across_dp, logits_indices, spec_decode_metadata, + max_num_scheduled_tokens) # all-gather one hidden-states in sp scene @staticmethod @@ -1124,7 +1011,7 @@ class NPUModelRunner(GPUModelRunner): hidden_states[1])) return NPUModelRunner._all_gather_hidden_states(hidden_states) - def _generate_process_reqs_hidden_states(self, maybe_padded_num_tokens, + def _generate_process_reqs_hidden_states(self, num_input_tokens, input_ids, positions, intermediate_tensors, inputs_embeds, model_kwargs): @@ -1138,9 +1025,8 @@ class NPUModelRunner(GPUModelRunner): forward_context = get_forward_context() if forward_context.cudagraph_runtime_mode == CUDAGraphMode.FULL \ and not self.use_sparse: - # TODO: maybe_padded_num_tokens will be removed, use num_input_tokens instead update_full_graph_params(self.attn_backend, self.update_stream, forward_context, - maybe_padded_num_tokens, self.vllm_config, + num_input_tokens, self.vllm_config, self.vllm_config.speculative_config) if get_forward_context().sp_enabled and not isinstance( @@ -1458,7 +1344,7 @@ class NPUModelRunner(GPUModelRunner): with self.maybe_get_ec_connector_output( scheduler_output, encoder_cache=self.encoder_cache, - ): + ) as ec_connector_output: self._execute_mm_encoder(scheduler_output) return make_empty_encoder_model_runner_output( scheduler_output) @@ -1476,13 +1362,18 @@ class NPUModelRunner(GPUModelRunner): if self.dynamic_eplb: self.eplb_updator.forward_before() - (attn_metadata, positions, num_scheduled_tokens_np, - num_input_tokens, num_tokens_across_dp, maybe_padded_num_tokens, - logits_indices, spec_decode_metadata, input_ids, inputs_embeds, - intermediate_tensors, max_query_len, - model_kwargs) = (self._prepare_inputs(scheduler_output, - intermediate_tensors)) + (attn_metadata, num_scheduled_tokens_np, num_input_tokens, + num_tokens_across_dp, logits_indices, spec_decode_metadata, + max_query_len) = self._prepare_inputs(scheduler_output) + (input_ids, inputs_embeds, positions, intermediate_tensors, + model_kwargs, ec_connector_output) = self._preprocess(scheduler_output, + num_input_tokens, + intermediate_tensors) + + # update global cos, sin + update_cos_sin(positions) + if self.dynamic_eplb: self.eplb_updator.take_update_info_from_eplb_process() @@ -1525,7 +1416,7 @@ class NPUModelRunner(GPUModelRunner): self.maybe_setup_kv_connector(scheduler_output) hidden_states = self._generate_process_reqs_hidden_states( - maybe_padded_num_tokens, input_ids, positions, + num_input_tokens, input_ids, positions, intermediate_tensors, inputs_embeds, model_kwargs) self.maybe_wait_for_kv_save() @@ -1594,6 +1485,7 @@ class NPUModelRunner(GPUModelRunner): aux_hidden_states, attn_metadata, positions, + ec_connector_output ) self.kv_connector_output = kv_connector_output return None @@ -1628,6 +1520,7 @@ class NPUModelRunner(GPUModelRunner): aux_hidden_states, attn_metadata, positions, + ec_connector_output ) = self.execute_model_state # Clear ephemeral state. self.execute_model_state = None @@ -1694,16 +1587,15 @@ class NPUModelRunner(GPUModelRunner): if has_kv_transfer_group(): get_kv_transfer_group().clear_connector_metadata() - extra_args = ({"kv_connector_output": kv_connector_output}) - model_runner_output = ModelRunnerOutput( req_ids=req_ids_output_copy, req_id_to_index=req_id_to_index_output_copy, sampled_token_ids=valid_sampled_token_ids, logprobs=logprobs_lists, prompt_logprobs_dict=prompt_logprobs_dict, + ec_connector_output=ec_connector_output, + kv_connector_output=kv_connector_output, pooler_output=[], - **extra_args, ) durations = ProfileExecuteDuration().pop_captured_sync()