From dd087effccad051720fbd26f798b0c68bfbf70d7 Mon Sep 17 00:00:00 2001 From: TaoYu Chen Date: Mon, 8 Sep 2025 10:45:23 +0800 Subject: [PATCH] Refector prepare_inputs in model_runner_v1.py (#2750) ### What this PR does / why we need it? Refector prepare_inputs in model_runner_v1.py for more easy read. ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? PASS CI - vLLM version: v0.10.1.1 - vLLM main: https://github.com/vllm-project/vllm/commit/e599e2c65ee32abcc986733ab0a55becea158bb4 --------- Signed-off-by: ChenTaoyu-SJTU --- vllm_ascend/worker/model_runner_v1.py | 147 ++++++++++++++++---------- 1 file changed, 90 insertions(+), 57 deletions(-) diff --git a/vllm_ascend/worker/model_runner_v1.py b/vllm_ascend/worker/model_runner_v1.py index 3afed6c..dee3731 100644 --- a/vllm_ascend/worker/model_runner_v1.py +++ b/vllm_ascend/worker/model_runner_v1.py @@ -880,6 +880,26 @@ class NPUModelRunner(LoRAModelRunnerMixin): mm_embeds.append(mm_embeds_item) return mm_embeds + def _get_cumsum_and_arange( + self, + num_tokens: np.ndarray, + cumsum_dtype: Optional[np.dtype] = None, + ) -> tuple[np.ndarray, np.ndarray]: + """Get the cumulative sum and batched arange of the given array. + # E.g., [2, 5, 3] -> ([2, 7, 10], [0, 1, 0, 1, 2, 3, 4, 0, 1, 2]) + # Equivalent to but faster than: + # np.concatenate([np.arange(n) for n in num_tokens]) + """ + # Step 1. [2, 5, 3] -> [2, 7, 10] + cu_num_tokens = np.cumsum(num_tokens, dtype=cumsum_dtype) + total_num_tokens = cu_num_tokens[-1] + # Step 2. [2, 7, 10] -> [0, 0, 2, 2, 2, 2, 2, 7, 7, 7] + cumsums_offsets = np.repeat(cu_num_tokens - num_tokens, num_tokens) + # Step 3. [0, 1, 0, 1, 2, 3, 4, 0, 1, 2] + arange = self.arange_np[:total_num_tokens] - cumsums_offsets + + return cu_num_tokens, arange + def _prepare_inputs( self, scheduler_output: "SchedulerOutput", @@ -901,17 +921,16 @@ class NPUModelRunner(LoRAModelRunnerMixin): self.input_batch.block_table.commit_block_table(num_reqs) # Get the number of scheduled tokens for each request. - # TODO: The Python loop can be slow. Optimize. - num_scheduled_tokens = np.empty(num_reqs, dtype=np.int32) - num_valid_tokens = np.empty(num_reqs, dtype=np.int32) - max_num_scheduled_tokens = 0 - for i, req_id in enumerate(self.input_batch.req_ids): - num_tokens = scheduler_output.num_scheduled_tokens[req_id] - num_scheduled_tokens[i] = num_tokens - num_valid_tokens[i] = num_tokens - \ - len(scheduler_output.scheduled_spec_decode_tokens.get(req_id, [])) - max_num_scheduled_tokens = max(max_num_scheduled_tokens, - num_tokens) + req_ids = self.input_batch.req_ids + tokens = [scheduler_output.num_scheduled_tokens[i] for i in req_ids] + num_scheduled_tokens = np.array(tokens, dtype=np.int32) + max_num_scheduled_tokens = max(tokens) + num_valid_tokens = np.array([ + num_tokens - + len(scheduler_output.scheduled_spec_decode_tokens.get(i, [])) + for num_tokens, i in zip(tokens, req_ids) + ], + dtype=np.int32) if (self.use_aclgraph and total_num_scheduled_tokens <= self.aclgraph_batch_sizes[-1]): @@ -952,13 +971,15 @@ class NPUModelRunner(LoRAModelRunnerMixin): if self.lora_config: self.set_active_loras(self.input_batch, num_scheduled_tokens) - # Prepare positions + # Get request indices. + # E.g., [2, 5, 3] -> [0, 0, 1, 1, 1, 1, 1, 2, 2, 2] req_indices = np.repeat(self.arange_np[:num_reqs], num_scheduled_tokens) - cu_num_tokens = np.cumsum(num_scheduled_tokens) - cumsums_offsets = np.repeat(cu_num_tokens - num_scheduled_tokens, - num_scheduled_tokens) - arange = self.arange_np[:total_num_scheduled_tokens] - cumsums_offsets + + # cu_num_tokens: [2, 5, 3] -> [2, 7, 10] + # arange: [0, 1, 0, 1, 2, 3, 4, 0, 1, 2] + cu_num_tokens, arange = self._get_cumsum_and_arange( + num_scheduled_tokens) positions_np = self.positions_np[:total_num_scheduled_tokens] np.add(self.input_batch.num_computed_tokens_cpu[req_indices], @@ -975,50 +996,73 @@ class NPUModelRunner(LoRAModelRunnerMixin): self.mrope_positions_cpu[:, :total_num_scheduled_tokens], non_blocking=True) - self.positions_cpu[total_num_scheduled_tokens:num_input_tokens].zero_() - self.positions[:num_input_tokens].copy_( - self.positions_cpu[:num_input_tokens], non_blocking=True) - positions_cpu = self.positions_cpu[:num_input_tokens] - positions = self.positions[:num_input_tokens] - self.query_lens = torch.from_numpy(num_scheduled_tokens) + # Get token indices. + # E.g., [0, 1, 0, 1, 2, 3, 4, 0, 1, 2] + # -> [0, 1, M, M + 1, M + 2, M + 3, M + 4, 2 * M, 2 * M + 1, 2 * M + 2] + # where M is the max_model_len. + token_indices = (positions_np + + req_indices * self.input_batch.token_ids_cpu.shape[1]) - self.seq_lens_np[:num_reqs] = ( - self.input_batch.num_computed_tokens_cpu[:num_reqs] + - num_scheduled_tokens) - seq_lens_cpu = self.seq_lens_cpu[:num_reqs] + # Prepare input_ids. + # NOTE(woosuk): We use torch.index_select instead of np.take here + # because torch.index_select is much faster than np.take for large + # tensors. + torch.index_select(self.input_batch.token_ids_cpu_tensor.flatten(), + 0, + torch.from_numpy(token_indices), + out=self.input_ids_cpu[:total_num_scheduled_tokens]) - block_table_indices = (req_indices * self.max_num_blocks_per_req + - positions_np // self.block_size) - - block_table_cpu = self.input_batch.block_table[0].get_cpu_tensor() - block_numbers = block_table_cpu.flatten()[block_table_indices].numpy() - block_offsets = positions_np % self.block_size - np.add(block_numbers * self.block_size, - block_offsets, - out=self.slot_mapping_np[:total_num_scheduled_tokens]) - - attn_state = self._build_attn_state(num_reqs, num_scheduled_tokens, - num_valid_tokens) - - self.attn_mask = self._make_attention_mask(seq_lens=seq_lens_cpu, - position=positions_cpu, - attn_state=attn_state) - self.attn_state = attn_state # type: ignore + # Prepare some information for building Attention-Metadata + # Compute and commit slot mapping + self.input_batch.block_table.compute_slot_mapping( + req_indices, positions_np) + self.input_batch.block_table.commit_slot_mapping( + total_num_scheduled_tokens) + self.slot_mapping_cpu[:total_num_scheduled_tokens].copy_( + self.input_batch.block_table[0]. + slot_mapping_cpu[:total_num_scheduled_tokens]) self.query_start_loc_np[0] = 0 self.query_start_loc_np[1:num_reqs + 1] = cu_num_tokens self.query_start_loc[:num_reqs + 1].copy_( self.query_start_loc_cpu[:num_reqs + 1], non_blocking=True) + + self.seq_lens_np[:num_reqs] = ( + self.input_batch.num_computed_tokens_cpu[:num_reqs] + + num_scheduled_tokens) self.seq_lens[:num_reqs].copy_(self.seq_lens_cpu[:num_reqs], non_blocking=True) # Fill unused with -1. Needed for reshape_and_cache - self.seq_lens[num_reqs:].fill_(0) self.query_start_loc[num_reqs + 1:].fill_(-1) + self.seq_lens[num_reqs:].fill_(0) + + self.query_lens = torch.from_numpy(num_scheduled_tokens) + + # Copy the tensors to the NPU. + self.input_ids[:total_num_scheduled_tokens].copy_( + self.input_ids_cpu[:total_num_scheduled_tokens], non_blocking=True) + + self.positions_cpu[total_num_scheduled_tokens:num_input_tokens].zero_() + self.positions[:num_input_tokens].copy_( + self.positions_cpu[:num_input_tokens], non_blocking=True) + + # Make Attention metadata + positions_cpu = self.positions_cpu[:num_input_tokens] + positions = self.positions[:num_input_tokens] + seq_lens_cpu = self.seq_lens_cpu[:num_reqs] + attn_state = self._build_attn_state(num_reqs, num_scheduled_tokens, + num_valid_tokens) + self.attn_mask = self._make_attention_mask(seq_lens=seq_lens_cpu, + position=positions_cpu, + attn_state=attn_state) + self.attn_state = attn_state # type: ignore self.with_prefill = with_prefill self.num_tokens_across_dp = num_tokens_across_dp self._update_graph_pad_size(with_prefill, maybe_padded_num_tokens) + + # Make AscendCommonAttentionMetadata common_attn_metadata = AscendCommonAttentionMetadata( query_start_loc=self.query_start_loc[:num_reqs + 1], query_start_loc_cpu=self.query_start_loc_cpu[:num_reqs + 1], @@ -1044,19 +1088,8 @@ class NPUModelRunner(LoRAModelRunnerMixin): if self.vllm_config.model_config.use_mla: attn_metadata.num_input_tokens = num_input_tokens - # Prepare input_ids - token_indices = (positions_np + - req_indices * self.input_batch.token_ids_cpu.shape[1]) - torch.index_select(self.input_batch.token_ids_cpu_tensor.flatten(), - 0, - torch.from_numpy(token_indices), - out=self.input_ids_cpu[:total_num_scheduled_tokens]) - # Copy the tensors to the NPU. - self.input_ids[:total_num_scheduled_tokens].copy_( - self.input_ids_cpu[:total_num_scheduled_tokens], non_blocking=True) - - # _prepare_inputs may reorder the batch, so we must gather multi - # modal outputs after that to ensure the correct order + # _prepare_inputs may reorder the batch, so we must gather + # multi-modal outputs after that to ensure the correct order if self.is_multimodal_model: # Run the multimodal encoder if any. self._execute_mm_encoder(scheduler_output)