[Refactor] Separate _prepare_inputs to _prepare_inputs and _preprocess (#6191)

### What this PR does / why we need it?

Align with upstream vLLM. This PR will help downstream vLLM-Omni reduce
the cost for maintaining the _prepare_inputs. Besides, it helps
vLLM-Ascend code more readable. In the future, we can follow closer to
vLLM. The `preprocess` logic is same as GPUModelRunner. We don't need to
maintain it anymore.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

CI.

- vLLM version: v0.14.0
- vLLM main:
d68209402d

---------

Signed-off-by: gcanlin <canlinguosdu@gmail.com>
This commit is contained in:
Canlin Guo
2026-01-26 14:05:23 +08:00
committed by GitHub
parent e3eefdecbd
commit 65289676b4

View File

@@ -62,7 +62,7 @@ from vllm.v1.kv_cache_interface import (AttentionSpec,
MambaSpec, UniformTypeKVCacheSpecs)
from vllm.v1.outputs import (EMPTY_MODEL_RUNNER_OUTPUT, AsyncModelRunnerOutput,
LogprobsLists, LogprobsTensors, ModelRunnerOutput,
SamplerOutput,
SamplerOutput, ECConnectorOutput,
make_empty_encoder_model_runner_output)
from vllm.v1.sample.logits_processor import build_logitsprocs
from vllm.v1.sample.metadata import SamplingMetadata
@@ -181,6 +181,7 @@ class ExecuteModelState(NamedTuple):
aux_hidden_states: list[torch.Tensor] | None
attn_metadata: dict[str, Any]
positions: torch.Tensor
ec_connector_output: ECConnectorOutput | None
class NPUModelRunner(GPUModelRunner):
@@ -497,10 +498,8 @@ class NPUModelRunner(GPUModelRunner):
self,
scheduler_output: "SchedulerOutput",
intermediate_tensors: Optional[IntermediateTensors] = None,
) -> tuple[dict[str, Any], torch.Tensor, np.ndarray, int, torch.Tensor,
int, torch.Tensor, SpecDecodeMetadata, Optional[torch.Tensor],
Optional[torch.Tensor], Optional[torch.Tensor], int, dict[str,
Any]]:
) -> tuple[dict[str, Any], np.ndarray, int, Optional[torch.Tensor],
torch.Tensor, Optional[SpecDecodeMetadata], int]:
total_num_scheduled_tokens = scheduler_output.total_num_scheduled_tokens
assert total_num_scheduled_tokens > 0
num_reqs = self.input_batch.num_reqs
@@ -604,15 +603,10 @@ class NPUModelRunner(GPUModelRunner):
self.query_lens = torch.from_numpy(num_scheduled_tokens)
# Get info across DP ranks.
# NOTE: maybe_padded_num_tokens is only used when using TorchAir with DP,
# Otherwise, it's just max_tokens_across_dp_cpu
(maybe_padded_num_tokens, num_tokens_across_dp,
(num_input_tokens, num_tokens_across_dp,
with_prefill) = self._sync_metadata_across_dp(num_input_tokens,
with_prefill)
self.with_prefill = with_prefill
# TODO: Now that num_input_tokens is basically identical with maybe_padded_num_tokens
# We should consider removing maybe_padded_num_tokens later
num_input_tokens = maybe_padded_num_tokens
# Hot-Swap lora model
if self.lora_config:
@@ -747,108 +741,6 @@ class NPUModelRunner(GPUModelRunner):
discard_request_indices)
self.discard_request_indices.copy_to_gpu(self.num_discarded_requests)
# _prepare_inputs may reorder the batch, so we must gather
# multi-modal outputs after that to ensure the correct order
model_kwargs = self._init_model_kwargs()
if self.is_multimodal_model and not self.model_config.is_encoder_decoder:
self.multimodal_cpu_fields = ["grid_thw"]
self._prepare_multimodal_fields()
with self.maybe_get_ec_connector_output(
scheduler_output,
encoder_cache=self.encoder_cache,
):
# Run the multimodal encoder if any.
self._execute_mm_encoder(scheduler_output)
# NOTE(woosuk): To unify token ids and soft tokens (vision
# embeddings), we always use embeddings (rather than token ids)
# as input to the multimodal model, even when the input is text.
input_ids = self.input_ids.gpu[:total_num_scheduled_tokens]
mm_embeds, is_mm_embed = self._gather_mm_embeddings(
scheduler_output)
inputs_embeds = self.model.embed_input_ids(
input_ids,
multimodal_embeddings=mm_embeds,
is_multimodal=is_mm_embed,
)
# TODO(woosuk): Avoid the copy. Optimize.
self.inputs_embeds.gpu[:total_num_scheduled_tokens].copy_(
inputs_embeds)
inputs_embeds = self.inputs_embeds.gpu[:num_input_tokens]
input_ids = None
elif self.enable_prompt_embeds and get_pp_group().is_first_rank:
# Get the input embeddings for the tokens that are not input embeds,
# then put them into the appropriate positions.
# TODO(qthequartermasterman): Since even when prompt embeds are
# enabled, (a) not all requests will use prompt embeds, and (b)
# after the initial prompt is processed, the rest of the generated
# tokens will be token ids, it is not desirable to have the
# embedding layer outside of the acl graph all the time. The v0
# engine avoids this by "double compiling" the acl graph, once
# with input_ids and again with inputs_embeds, for all num_tokens.
# If a batch only has token ids, then including the embedding layer
# in the acl graph will be more performant (like in the else case
# below).
token_ids_idx = self.is_token_ids.gpu[:total_num_scheduled_tokens] \
.nonzero(as_tuple=False) \
.squeeze(1)
# Some tokens ids may need to become embeds
if token_ids_idx.numel() > 0:
token_ids = self.input_ids.gpu[token_ids_idx]
tokens_to_embeds = self.model.embed_input_ids(
input_ids=token_ids)
self.inputs_embeds.gpu[token_ids_idx] = tokens_to_embeds
inputs_embeds = self.inputs_embeds.gpu[:num_input_tokens]
input_ids = None
else:
# For text-only models, we use token ids as input.
# While it is possible to use embeddings as input just like the
# multimodal models, it is not desirable for performance since
# then the embedding layer is not included in the ACL graph.
input_ids = self.input_ids.gpu[:num_input_tokens]
inputs_embeds = None
if self.uses_mrope:
positions = self.mrope_positions.gpu[:, :num_input_tokens]
elif self.uses_xdrope_dim > 0:
positions = self.xdrope_positions.gpu[:, :num_input_tokens]
else:
positions = self.positions.gpu[:num_input_tokens]
# Run the encoder, just like we do with other multimodal inputs.
if self.model_config.is_encoder_decoder and scheduler_output.scheduled_encoder_inputs:
input_ids = self.input_ids.gpu[:total_num_scheduled_tokens]
positions = self.positions.gpu[:total_num_scheduled_tokens]
encoder_outputs = self._execute_mm_encoder(scheduler_output)
model_kwargs.update({"encoder_outputs": encoder_outputs})
# type: ignore
if get_pp_group().is_first_rank:
intermediate_tensors = None
else:
assert intermediate_tensors is not None
assert self.intermediate_tensors is not None
# If both flashcomm1 and pp are used simultaneously,
# the shape of the received data and the shape of the space to be copied to will not match,
# requiring a recalculation of the incoming data's shape.
tp_size = get_tensor_model_parallel_world_size()
num_input_tokens_with_flashcomm1 = num_input_tokens
if enable_sp():
num_input_tokens_with_flashcomm1 = (num_input_tokens +
tp_size - 1) // tp_size
for k, v in intermediate_tensors.items():
self.intermediate_tensors[
k][:num_input_tokens_with_flashcomm1].copy_(
v[:num_input_tokens_with_flashcomm1],
non_blocking=True)
intermediate_tensors = IntermediateTensors({
k:
v[:num_input_tokens_with_flashcomm1]
for k, v in self.intermediate_tensors.items()
})
use_spec_decode = len(
scheduler_output.scheduled_spec_decode_tokens) > 0
if not use_spec_decode:
@@ -1082,20 +974,15 @@ class NPUModelRunner(GPUModelRunner):
for layer_name in attn_group.layer_names:
attn_metadata[layer_name] = attn_metadata_i
# update global cos, sin
update_cos_sin(positions)
if lmhead_tp_enable():
max_num_reqs_across_dp = self.max_num_reqs * self.uniform_decode_query_len
logits_indices = nn.functional.pad(
logits_indices,
(0, max_num_reqs_across_dp - logits_indices.shape[0]))
return (attn_metadata, positions, num_scheduled_tokens,
num_input_tokens, num_tokens_across_dp,
maybe_padded_num_tokens, logits_indices, spec_decode_metadata,
input_ids, inputs_embeds, intermediate_tensors,
max_num_scheduled_tokens, model_kwargs)
return (attn_metadata, num_scheduled_tokens, num_input_tokens,
num_tokens_across_dp, logits_indices, spec_decode_metadata,
max_num_scheduled_tokens)
# all-gather one hidden-states in sp scene
@staticmethod
@@ -1124,7 +1011,7 @@ class NPUModelRunner(GPUModelRunner):
hidden_states[1]))
return NPUModelRunner._all_gather_hidden_states(hidden_states)
def _generate_process_reqs_hidden_states(self, maybe_padded_num_tokens,
def _generate_process_reqs_hidden_states(self, num_input_tokens,
input_ids, positions,
intermediate_tensors,
inputs_embeds, model_kwargs):
@@ -1138,9 +1025,8 @@ class NPUModelRunner(GPUModelRunner):
forward_context = get_forward_context()
if forward_context.cudagraph_runtime_mode == CUDAGraphMode.FULL \
and not self.use_sparse:
# TODO: maybe_padded_num_tokens will be removed, use num_input_tokens instead
update_full_graph_params(self.attn_backend, self.update_stream, forward_context,
maybe_padded_num_tokens, self.vllm_config,
num_input_tokens, self.vllm_config,
self.vllm_config.speculative_config)
if get_forward_context().sp_enabled and not isinstance(
@@ -1458,7 +1344,7 @@ class NPUModelRunner(GPUModelRunner):
with self.maybe_get_ec_connector_output(
scheduler_output,
encoder_cache=self.encoder_cache,
):
) as ec_connector_output:
self._execute_mm_encoder(scheduler_output)
return make_empty_encoder_model_runner_output(
scheduler_output)
@@ -1476,13 +1362,18 @@ class NPUModelRunner(GPUModelRunner):
if self.dynamic_eplb:
self.eplb_updator.forward_before()
(attn_metadata, positions, num_scheduled_tokens_np,
num_input_tokens, num_tokens_across_dp, maybe_padded_num_tokens,
logits_indices, spec_decode_metadata, input_ids, inputs_embeds,
intermediate_tensors, max_query_len,
model_kwargs) = (self._prepare_inputs(scheduler_output,
intermediate_tensors))
(attn_metadata, num_scheduled_tokens_np, num_input_tokens,
num_tokens_across_dp, logits_indices, spec_decode_metadata,
max_query_len) = self._prepare_inputs(scheduler_output)
(input_ids, inputs_embeds, positions, intermediate_tensors,
model_kwargs, ec_connector_output) = self._preprocess(scheduler_output,
num_input_tokens,
intermediate_tensors)
# update global cos, sin
update_cos_sin(positions)
if self.dynamic_eplb:
self.eplb_updator.take_update_info_from_eplb_process()
@@ -1525,7 +1416,7 @@ class NPUModelRunner(GPUModelRunner):
self.maybe_setup_kv_connector(scheduler_output)
hidden_states = self._generate_process_reqs_hidden_states(
maybe_padded_num_tokens, input_ids, positions,
num_input_tokens, input_ids, positions,
intermediate_tensors, inputs_embeds, model_kwargs)
self.maybe_wait_for_kv_save()
@@ -1594,6 +1485,7 @@ class NPUModelRunner(GPUModelRunner):
aux_hidden_states,
attn_metadata,
positions,
ec_connector_output
)
self.kv_connector_output = kv_connector_output
return None
@@ -1628,6 +1520,7 @@ class NPUModelRunner(GPUModelRunner):
aux_hidden_states,
attn_metadata,
positions,
ec_connector_output
) = self.execute_model_state
# Clear ephemeral state.
self.execute_model_state = None
@@ -1694,16 +1587,15 @@ class NPUModelRunner(GPUModelRunner):
if has_kv_transfer_group():
get_kv_transfer_group().clear_connector_metadata()
extra_args = ({"kv_connector_output": kv_connector_output})
model_runner_output = ModelRunnerOutput(
req_ids=req_ids_output_copy,
req_id_to_index=req_id_to_index_output_copy,
sampled_token_ids=valid_sampled_token_ids,
logprobs=logprobs_lists,
prompt_logprobs_dict=prompt_logprobs_dict,
ec_connector_output=ec_connector_output,
kv_connector_output=kv_connector_output,
pooler_output=[],
**extra_args,
)
durations = ProfileExecuteDuration().pop_captured_sync()