From f708d919f88d3a0f7ffbe93d81e7ceac3960a73e Mon Sep 17 00:00:00 2001 From: zhenwenqi2024 <155598497+zhenwenqi2024@users.noreply.github.com> Date: Fri, 12 Dec 2025 17:27:09 +0800 Subject: [PATCH] [Feature] model_runner refactor (#4764) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### What this PR does / why we need it? refactor npu_modelrunner, we should be close to gpu_modelrunner ### Does this PR introduce _any_ user-facing change? NO - vLLM version: v0.12.0 - vLLM main: https://github.com/vllm-project/vllm/commit/ad32e3e19ccf0526cb6744a5fed09a138a5fb2f9 --------- Signed-off-by: zhenwenqi2024 Signed-off-by: zhenwenqi2024 <155598497+zhenwenqi2024@users.noreply.github.com> --- tests/ut/worker/test_input_batch.py | 3 + tests/ut/worker/test_model_runner_v1.py | 113 -- vllm_ascend/ascend_forward_context.py | 76 +- vllm_ascend/attention/mla_v1.py | 4 +- vllm_ascend/attention/sfa_v1.py | 4 +- vllm_ascend/attention/utils.py | 4 - vllm_ascend/spec_decode/mtp_proposer.py | 28 +- vllm_ascend/worker/block_table.py | 64 +- vllm_ascend/worker/model_runner_v1.py | 2088 +++++------------------ vllm_ascend/worker/npu_input_batch.py | 107 +- 10 files changed, 676 insertions(+), 1815 deletions(-) delete mode 100644 tests/ut/worker/test_model_runner_v1.py diff --git a/tests/ut/worker/test_input_batch.py b/tests/ut/worker/test_input_batch.py index 15c323b2..5b2cd82b 100644 --- a/tests/ut/worker/test_input_batch.py +++ b/tests/ut/worker/test_input_batch.py @@ -24,6 +24,7 @@ from vllm.utils.torch_utils import make_tensor_with_pad from vllm.v1.pool.metadata import PoolingMetadata from vllm.v1.sample.logits_processor import LogitsProcessors from vllm.v1.sample.metadata import SamplingMetadata +from vllm.v1.utils import CpuGpuBuffer from vllm_ascend.worker.block_table import BlockTable, MultiGroupBlockTable from vllm_ascend.worker.npu_input_batch import CachedRequestState, InputBatch @@ -67,6 +68,8 @@ def _compare_objs(obj1, is_same = True # if we make it here must be same elif a == b: is_same = True + elif isinstance(a, CpuGpuBuffer): + is_same = np.allclose(a.np, b.np) and torch.allclose(a.gpu, b.gpu) assert is_same, f"Attribute {attr_name} is different"\ f" in {obj1} and {obj2}: {a} != {b}" diff --git a/tests/ut/worker/test_model_runner_v1.py b/tests/ut/worker/test_model_runner_v1.py deleted file mode 100644 index fe945337..00000000 --- a/tests/ut/worker/test_model_runner_v1.py +++ /dev/null @@ -1,113 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# This file is a part of the vllm-ascend project. - -from unittest.mock import MagicMock, patch - -import pytest - -from vllm_ascend.ascend_forward_context import MoECommType -from vllm_ascend.utils import AscendDeviceType -from vllm_ascend.worker.model_runner_v1 import NPUModelRunner - - -# yapf: disable -@pytest.mark.parametrize( - "soc_version, enable_expert_parallel, world_size, pipeline_size, num_tokens, mc2_tokens_capacity, quant_type, expected_method", - [ - # Case 1: Expert parallel is disabled, should always be 'allgather' - (AscendDeviceType._910B, False, 8, 2, 100, 256, None, MoECommType.ALLGATHER), - (AscendDeviceType._910_93, False, 16, 2, 500, 256, None, MoECommType.ALLGATHER), - - # Case 2: A2 SOC with w4a8_dynamic -> use alltoall when not mc2 - (AscendDeviceType._910B, True, 8, 1, 100, 256, "w4a8_dynamic", MoECommType.ALLTOALL), - (AscendDeviceType._910B, True, 16, 1, 257, 256, "w4a8_dynamic", MoECommType.ALLTOALL), - (AscendDeviceType._910B, True, 16, 1, 100, 256, "w4a8_dynamic", MoECommType.MC2), # meets mc2 condition - - # Case 3: A2 SOC without w4a8_dynamic -> fallback to allgather - (AscendDeviceType._910B, True, 8, 2, 100, 256, None, MoECommType.ALLGATHER), - (AscendDeviceType._910B, True, 16, 2, 257, 256, None, MoECommType.ALLGATHER), - - # Case 4: A3 SOC - (AscendDeviceType._910_93, True, 8, 2, 100, 256, None, MoECommType.MC2), - (AscendDeviceType._910_93, True, 8, 2, 257, 256, None, MoECommType.ALLTOALL), - ]) -# yapf: enable -def test_select_moe_comm_method(soc_version, enable_expert_parallel, - world_size, pipeline_size, num_tokens, - mc2_tokens_capacity, quant_type, - expected_method): - """ - Tests the _select_moe_comm_method with various configurations including quant_type. - """ - # Mock the NPUModelRunner instance and its dependencies - mock_runner = MagicMock(spec=NPUModelRunner) - mock_runner.parallel_config = MagicMock() - mock_runner.parallel_config.enable_expert_parallel = enable_expert_parallel - mock_runner.parallel_config.world_size_across_dp = world_size - mock_runner.parallel_config.pipeline_parallel_size = pipeline_size - mock_runner.mc2_tokens_capacity = mc2_tokens_capacity - - # Add vllm_config.model_config.hf_config mock with moe_quantize - mock_hf_config = MagicMock() - mock_hf_config.moe_quantize = quant_type - mock_model_config = MagicMock() - mock_model_config.hf_config = mock_hf_config - mock_vllm_config = MagicMock() - mock_vllm_config.model_config = mock_model_config - mock_runner.vllm_config = mock_vllm_config - - # Patch the helper functions - with patch('vllm_ascend.worker.model_runner_v1.get_ascend_device_type', - return_value=soc_version), \ - patch('vllm_ascend.worker.model_runner_v1.is_global_first_rank', - return_value=True), \ - patch('vllm_ascend.worker.model_runner_v1.is_moe_model', - return_value=True): - - # Bind the real method to the mock object - method = NPUModelRunner._select_moe_comm_method( - mock_runner, num_tokens) - - # Assert the result - assert method == expected_method - - -def test_select_moe_comm_method_unsupported_soc(): - """ - Tests that _select_moe_comm_method raises ValueError for an unsupported SOC. - """ - mock_runner = MagicMock(spec=NPUModelRunner) - mock_runner.parallel_config = MagicMock() - mock_runner.parallel_config.enable_expert_parallel = True - mock_runner.mc2_tokens_capacity = 256 - - # Add vllm_config.model_config.hf_config mock with moe_quantize - mock_hf_config = MagicMock() - mock_hf_config.moe_quantize = None - mock_model_config = MagicMock() - mock_model_config.hf_config = mock_hf_config - mock_vllm_config = MagicMock() - mock_vllm_config.model_config = mock_model_config - mock_runner.vllm_config = mock_vllm_config - - unsupported_soc = "UnsupportedSOC" - - with patch('vllm_ascend.worker.model_runner_v1.get_ascend_device_type', - return_value=unsupported_soc), \ - patch('vllm_ascend.worker.model_runner_v1.is_global_first_rank', - return_value=True), \ - patch('vllm_ascend.worker.model_runner_v1.is_moe_model', - return_value=True), \ - pytest.raises(ValueError, match=f"Unsupported soc_version: {unsupported_soc}"): - - NPUModelRunner._select_moe_comm_method(mock_runner, 100) diff --git a/vllm_ascend/ascend_forward_context.py b/vllm_ascend/ascend_forward_context.py index 158e35a8..22233bc9 100644 --- a/vllm_ascend/ascend_forward_context.py +++ b/vllm_ascend/ascend_forward_context.py @@ -35,7 +35,6 @@ def set_ascend_forward_context( num_tokens_across_dp: Optional[torch.Tensor] = None, with_prefill: bool = True, in_profile_run: bool = False, - reserved_mc2_mask: Optional[torch.Tensor] = None, moe_comm_type: Optional[MoECommType] = None, num_actual_tokens: Optional[int] = None, aclgraph_runtime_mode: CUDAGraphMode = CUDAGraphMode.NONE, @@ -147,7 +146,7 @@ def set_ascend_forward_context( # NOTE: token num which need to pad to when mc2 forward_context.padded_num_tokens = math.ceil( max_tokens_across_dp / tp_world_size) * tp_world_size - + reserved_mc2_mask = get_mc2_mask() if reserved_mc2_mask is not None: mc2_mask = reserved_mc2_mask[:forward_context. padded_num_tokens] @@ -159,3 +158,76 @@ def set_ascend_forward_context( yield finally: pass + + +_mc2_tokens_capacity: Optional[int] = None +_reserved_mc2_mask: Optional[torch.Tensor] = None +_sin: Optional[torch.Tensor] = None +_cos: Optional[torch.Tensor] = None + + +def set_mc2_tokens_capacity(vllm_config, max_num_reqs, + uniform_decode_query_len): + global _mc2_tokens_capacity + if _mc2_tokens_capacity is not None: + return + if vllm_config.compilation_config.cudagraph_capture_sizes: + max_num_tokens = vllm_config.compilation_config.max_cudagraph_capture_size + else: + # NOTE: To save memory, we cap the max number of tokens to 512. + max_num_tokens = min(max_num_reqs * uniform_decode_query_len, 512) + tp_size = vllm_config.parallel_config.tensor_parallel_size + # Use integer arithmetic for ceiling division. + num_tokens_per_tp_rank = (max_num_tokens + tp_size - 1) // tp_size + _mc2_tokens_capacity = num_tokens_per_tp_rank * tp_size + + +def get_mc2_tokens_capacity(): + return _mc2_tokens_capacity + + +def set_mc2_mask(vllm_config, device): + global _reserved_mc2_mask + if _reserved_mc2_mask is not None: + return + if is_moe_model(vllm_config): + _reserved_mc2_mask = torch.zeros(get_mc2_tokens_capacity(), + dtype=torch.bool, + device=device) + else: + _reserved_mc2_mask = None + + +def get_mc2_mask(): + return _reserved_mc2_mask + + +def set_cos_and_sin(vllm_config, max_num_reqs, decode_token_per_req, dtype, + device): + global _cos + global _sin + if _cos is not None: + return + compilation_config = vllm_config.compilation_config + model_config = vllm_config.model_config + if model_config.use_mla and compilation_config.cudagraph_mode == CUDAGraphMode.FULL_DECODE_ONLY: + rope_dim = model_config.hf_text_config.qk_rope_head_dim + _cos = torch.ones(max_num_reqs * decode_token_per_req, + 1, + 1, + rope_dim, + dtype=dtype, + device=device) + _sin = torch.zeros(max_num_reqs * decode_token_per_req, + 1, + 1, + rope_dim, + dtype=dtype, + device=device) + else: + _cos = None + _sin = None + + +def get_cos_and_sin(): + return _cos, _sin diff --git a/vllm_ascend/attention/mla_v1.py b/vllm_ascend/attention/mla_v1.py index e37b0d71..1674d346 100644 --- a/vllm_ascend/attention/mla_v1.py +++ b/vllm_ascend/attention/mla_v1.py @@ -25,6 +25,7 @@ from vllm.v1.attention.backends.utils import AttentionCGSupport from vllm_ascend import envs from vllm_ascend.ascend_config import get_ascend_config +from vllm_ascend.ascend_forward_context import get_cos_and_sin from vllm_ascend.attention.attention_v1 import AscendAttentionState from vllm_ascend.attention.utils import (AscendCommonAttentionMetadata, maybe_save_kv_layer_to_connector, @@ -625,8 +626,7 @@ class AscendMLAMetadataBuilder: decode_metadata = None if num_decodes > 0: - cos = common_attn_metadata.cos - sin = common_attn_metadata.sin + cos, sin = get_cos_and_sin() # Notice that num_decodes != num_decode_tokens in SpecDecoding Scenario actual_seq_lengths_q = query_start_loc_cpu[1:num_decodes + 1].tolist() diff --git a/vllm_ascend/attention/sfa_v1.py b/vllm_ascend/attention/sfa_v1.py index 03eed5a7..bc1de75f 100644 --- a/vllm_ascend/attention/sfa_v1.py +++ b/vllm_ascend/attention/sfa_v1.py @@ -16,6 +16,7 @@ from vllm.v1.attention.backends.utils import AttentionCGSupport from vllm_ascend import envs from vllm_ascend.ascend_config import get_ascend_config +from vllm_ascend.ascend_forward_context import get_cos_and_sin from vllm_ascend.attention.attention_v1 import AscendAttentionState from vllm_ascend.attention.mla_v1 import MAX_O_PROJ_PREFETCH_SIZE from vllm_ascend.attention.utils import (AscendCommonAttentionMetadata, @@ -186,8 +187,7 @@ class AscendSFAMetadataBuilder: cum_query_lens = common_attn_metadata.query_start_loc[1:num_reqs + 1] seq_lens = common_attn_metadata.seq_lens[:num_reqs] - cos = common_attn_metadata.cos - sin = common_attn_metadata.sin + cos, sin = get_cos_and_sin() assert self.cos_cache is not None and self.sin_cache is not None new_cos = self.cos_cache[input_positions][:, None, None] diff --git a/vllm_ascend/attention/utils.py b/vllm_ascend/attention/utils.py index a2f71de7..25a2f2b8 100644 --- a/vllm_ascend/attention/utils.py +++ b/vllm_ascend/attention/utils.py @@ -100,10 +100,6 @@ class AscendCommonAttentionMetadata: # padding tokens. It is used to handle some padding operations. num_input_tokens: int = 0 - # NOTE: This is a temporary solution for rotary embedding in MLA - cos: torch.Tensor = None - sin: torch.Tensor = None - prefill_context_parallel_metadata: Optional[ AscendPrefillContextParallelMetadata] = None diff --git a/vllm_ascend/spec_decode/mtp_proposer.py b/vllm_ascend/spec_decode/mtp_proposer.py index 9408ff96..58310787 100644 --- a/vllm_ascend/spec_decode/mtp_proposer.py +++ b/vllm_ascend/spec_decode/mtp_proposer.py @@ -256,11 +256,12 @@ class MtpProposer(Proposer): self.runner.input_batch. num_computed_tokens_cpu_tensor[:num_reqs]) common_attn_metadata = AscendCommonAttentionMetadata( - query_start_loc=self.runner.query_start_loc[:num_reqs + 1], - query_start_loc_cpu=self.runner. - query_start_loc_cpu[:num_reqs + 1], - seq_lens_cpu=self.runner.seq_lens_cpu, - seq_lens=self.runner.seq_lens[:num_reqs], + query_start_loc=self.runner.query_start_loc.gpu[:num_reqs + + 1], + query_start_loc_cpu=self.runner.query_start_loc. + cpu[:num_reqs + 1], + seq_lens_cpu=self.runner.seq_lens.cpu, + seq_lens=self.runner.seq_lens.gpu[:num_reqs], num_reqs=num_reqs, num_actual_tokens=num_tokens, num_input_tokens=num_tokens, @@ -268,16 +269,14 @@ class MtpProposer(Proposer): num_computed_tokens_cpu=num_computed_tokens_cpu, actual_seq_lengths_q=self.runner.actual_seq_lengths_q, block_table_tensor=self.runner.input_batch.block_table[0]. - get_device_tensor()[:num_reqs], + get_device_tensor(), slot_mapping=self.runner.input_batch.block_table[0]. - slot_mapping, - positions=self.runner.positions, + slot_mapping.gpu, + positions=self.runner.positions.gpu, attn_mask=self.runner.attn_mask, spec_attn_mask=self.runner.spec_attn_mask, attn_state=self.runner.attn_state, decode_token_per_req=self.runner.decode_token_per_req, - cos=self.runner.cos, - sin=self.runner.sin, ) builder = self.runner.attn_groups[0][0].get_metadata_builder() @@ -304,7 +303,6 @@ class MtpProposer(Proposer): num_tokens=num_tokens, with_prefill=with_prefill, num_tokens_across_dp=num_tokens_across_dp, - reserved_mc2_mask=self.runner.reserved_mc2_mask, moe_comm_type=moe_comm_type, in_profile_run=self.runner.in_profile_run, num_actual_tokens=0, @@ -406,7 +404,8 @@ class MtpProposer(Proposer): else: token_indices_to_sample = None # input_ids can be None for multimodal models. - target_token_ids = self.runner.input_ids[:num_scheduled_tokens] + target_token_ids = self.runner.input_ids.gpu[: + num_scheduled_tokens] target_positions = positions[:num_scheduled_tokens] target_hidden_states = hidden_states[:num_scheduled_tokens] else: @@ -435,7 +434,7 @@ class MtpProposer(Proposer): target_positions = positions target_hidden_states = hidden_states else: - target_token_ids = self.runner.input_ids[token_indices] + target_token_ids = self.runner.input_ids.gpu[token_indices] target_positions = positions[token_indices] target_hidden_states = hidden_states[token_indices] @@ -748,7 +747,7 @@ class MtpProposer(Proposer): uniform_decode = False has_lora = len(self.runner.input_batch.lora_id_to_lora_request) > 0 aclgraph_runtime_mode, batch_descriptor = \ - self.runner.aclgraph_dispatcher.dispatch(num_tokens=num_input_tokens, uniform_decode=uniform_decode, has_lora=has_lora) + self.runner.cudagraph_dispatcher.dispatch(num_tokens=num_input_tokens, uniform_decode=uniform_decode, has_lora=has_lora) if self.use_async_scheduling: # there is synchronization between mtp steps when enabling aclgraph, # disable aclgraph when use async scheduling to avoid the @@ -781,7 +780,6 @@ class MtpProposer(Proposer): num_tokens=num_input_tokens, with_prefill=with_prefill, num_tokens_across_dp=num_tokens_across_dp, - reserved_mc2_mask=self.runner.reserved_mc2_mask, moe_comm_type=moe_comm_type, aclgraph_runtime_mode=aclgraph_runtime_mode, batch_descriptor=batch_descriptor, diff --git a/vllm_ascend/worker/block_table.py b/vllm_ascend/worker/block_table.py index 2d9e9569..5b450f6c 100644 --- a/vllm_ascend/worker/block_table.py +++ b/vllm_ascend/worker/block_table.py @@ -4,6 +4,7 @@ import numpy as np import torch from vllm.distributed import get_dcp_group, get_pcp_group from vllm.utils.math_utils import cdiv +from vllm.v1.utils import CpuGpuBuffer class BlockTable: @@ -76,32 +77,14 @@ class BlockTable: duplicate_size = 1 if self.pcp_world_size > 1: duplicate_size += num_speculative_tokens - self.block_table = torch.zeros( - (max_num_reqs * duplicate_size, logical_table_size), - device=self.device, - dtype=torch.int32, - ) - self.block_table_cpu = torch.zeros( - (max_num_reqs * duplicate_size, logical_table_size), - device="cpu", - dtype=torch.int32, - pin_memory=pin_memory, - ) - self.block_table_np = self.block_table_cpu.numpy() + self.block_table = self._make_buffer(max_num_reqs * duplicate_size, + logical_table_size, + dtype=torch.int32) self.num_blocks_per_row = np.zeros(max_num_reqs, dtype=np.int32) - - self.slot_mapping_cpu = torch.zeros( + self.slot_mapping = self._make_buffer( self.max_num_batched_tokens + 2 * self.pcp_world_size * self.max_num_reqs, - dtype=torch.int32, - device="cpu", - pin_memory=self.pin_memory) - self.slot_mapping_np = self.slot_mapping_cpu.numpy() - self.slot_mapping = torch.zeros( - self.max_num_batched_tokens + - 2 * self.pcp_world_size * self.max_num_reqs, - dtype=torch.int32, - device=self.device) + dtype=torch.int32) self.kernel_sizes = kernel_sizes self.cp_kv_cache_interleave_size = cp_kv_cache_interleave_size @@ -120,7 +103,7 @@ class BlockTable: num_blocks = len(block_ids) start = self.num_blocks_per_row[row_idx] - self.block_table_np[row_idx, start:start + num_blocks] = block_ids + self.block_table.np[row_idx, start:start + num_blocks] = block_ids self.num_blocks_per_row[row_idx] += num_blocks def add_row(self, block_ids: list[int], row_idx: int) -> None: @@ -129,7 +112,7 @@ class BlockTable: def move_row(self, src: int, tgt: int) -> None: num_blocks = self.num_blocks_per_row[src] - self.block_table_np[tgt, :num_blocks] = self.block_table_np[ + self.block_table.np[tgt, :num_blocks] = self.block_table.np[ src, :num_blocks] self.num_blocks_per_row[tgt] = num_blocks @@ -139,7 +122,7 @@ class BlockTable: self.num_blocks_per_row[src] = num_blocks_tgt self.num_blocks_per_row[tgt] = num_blocks_src - self.block_table_np[[src, tgt]] = self.block_table_np[[tgt, src]] + self.block_table.np[[src, tgt]] = self.block_table.np[[tgt, src]] def compute_slot_mapping(self, req_indices: np.ndarray, positions: np.ndarray) -> None: @@ -171,7 +154,7 @@ class BlockTable: self.blocks_per_phys_block + logical_block_idx) - block_numbers = self.block_table_np.ravel()[block_table_indices] + block_numbers = self.block_table.np.ravel()[block_table_indices] # Use virtual_block_size for mask calculation, which marks local # tokens. virtual_block_offsets = positions % virtual_block_size @@ -186,7 +169,7 @@ class BlockTable: # Calculate slot_mapping slot_mapping = block_numbers * self.block_size + block_offsets # Write final slots, use -1 for not-local - self.slot_mapping_np[:req_indices.shape[0]] = np.where( + self.slot_mapping.np[:req_indices.shape[0]] = np.where( mask, slot_mapping, -1) else: assert self.kernel_sizes is not None @@ -203,24 +186,22 @@ class BlockTable: req_indices * self.max_num_blocks_per_req * self.blocks_per_phys_block + logical_block_idx) - block_numbers = self.block_table_np.ravel( + block_numbers = self.block_table.np.ravel( )[block_table_indices] block_offsets = positions % self.block_size np.add(block_numbers * self.block_size, block_offsets, - out=self.slot_mapping_np[:req_indices.shape[0]]) + out=self.slot_mapping.np[:req_indices.shape[0]]) def commit_block_table(self, num_reqs: int) -> None: - self.block_table[:num_reqs].copy_(self.block_table_cpu[:num_reqs], - non_blocking=True) + self.block_table.copy_to_gpu(num_reqs) def commit_slot_mapping(self, num_tokens: int) -> None: - self.slot_mapping[:num_tokens].copy_( - self.slot_mapping_cpu[:num_tokens], non_blocking=True) + self.slot_mapping.copy_to_gpu(num_tokens) def clear(self) -> None: self.block_table.fill_(0) - self.block_table_cpu.fill_(0) + self.block_table.cpu.fill_(0) def _convert_physical_to_logical_blocks( self, physical_blocks: np.ndarray) -> np.ndarray: @@ -243,15 +224,22 @@ class BlockTable: def get_device_tensor(self) -> torch.Tensor: """Returns the device tensor of the block table.""" - return self.block_table + return self.block_table.gpu def get_cpu_tensor(self) -> torch.Tensor: """Returns the CPU tensor of the block table.""" - return self.block_table_cpu + return self.block_table.cpu def get_numpy_array(self) -> np.ndarray: """Returns the numpy array of the block table.""" - return self.block_table_np + return self.block_table.np + + def _make_buffer(self, *size: int | torch.SymInt, + dtype: torch.dtype) -> CpuGpuBuffer: + return CpuGpuBuffer(*size, + dtype=dtype, + device=self.device, + pin_memory=self.pin_memory) class MultiGroupBlockTable: diff --git a/vllm_ascend/worker/model_runner_v1.py b/vllm_ascend/worker/model_runner_v1.py index 611c6728..c6c881eb 100644 --- a/vllm_ascend/worker/model_runner_v1.py +++ b/vllm_ascend/worker/model_runner_v1.py @@ -17,22 +17,17 @@ # Adapted from vllm-project/vllm/vllm/worker/gpu_model_runner.py # -import copy import gc -import itertools import math import time from collections import defaultdict -from collections.abc import Iterator from contextlib import contextmanager, nullcontext from copy import deepcopy from dataclasses import dataclass from multiprocessing import Manager -from typing import (TYPE_CHECKING, Any, Dict, List, NamedTuple, Optional, - Union, cast) +from typing import TYPE_CHECKING, Any, Dict, List, NamedTuple, Optional, Union import numpy as np -import numpy.typing as npt import regex as re import torch import torch.distributed as dist @@ -50,7 +45,6 @@ from vllm.distributed import (get_tensor_model_parallel_world_size, from vllm.distributed.ec_transfer import get_ec_transfer, has_ec_transfer from vllm.distributed.kv_transfer import (get_kv_transfer_group, has_kv_transfer_group) -from vllm.distributed.kv_transfer.kv_connector.v1 import KVConnectorBase_V1 from vllm.distributed.parallel_state import (get_dcp_group, get_dp_group, get_ep_group, get_pcp_group, get_pp_group, get_tp_group, @@ -59,32 +53,15 @@ from vllm.forward_context import get_forward_context from vllm.logger import logger from vllm.model_executor.layers.attention_layer_base import AttentionLayerBase from vllm.model_executor.layers.mamba.abstract import MambaBase -from vllm.model_executor.layers.rotary_embedding import MRotaryEmbedding from vllm.model_executor.model_loader import get_model -from vllm.model_executor.models.interfaces import (SupportsMultiModal, - supports_mrope, - supports_transcription) -from vllm.model_executor.models.interfaces_base import ( - VllmModelForPooling, is_pooling_model, is_text_generation_model) -from vllm.multimodal import MULTIMODAL_REGISTRY -from vllm.multimodal.inputs import MultiModalKwargsItem, PlaceholderRange -from vllm.multimodal.utils import group_mm_kwargs_by_modality -from vllm.pooling_params import PoolingParams -from vllm.sampling_params import SamplingType from vllm.sequence import IntermediateTensors -from vllm.tasks import GenerationTask, PoolingTask, SupportedTask -from vllm.utils import length_from_prompt_token_ids_or_embeds from vllm.utils.import_utils import LazyLoader -from vllm.utils.jsontree import json_map_leaves from vllm.utils.math_utils import cdiv from vllm.utils.mem_utils import DeviceMemoryProfiler -from vllm.utils.platform_utils import is_pin_memory_available -from vllm.utils.torch_utils import STR_DTYPE_TO_TORCH_DTYPE, get_dtype_size +from vllm.utils.torch_utils import get_dtype_size from vllm.v1.attention.backends.gdn_attn import GDNAttentionMetadataBuilder -from vllm.v1.attention.backends.utils import ( - AttentionCGSupport, CommonAttentionMetadata, - reorder_batch_to_split_decodes_and_prefills) -from vllm.v1.cudagraph_dispatcher import CudagraphDispatcher +from vllm.v1.attention.backends.utils import (AttentionCGSupport, + CommonAttentionMetadata) from vllm.v1.kv_cache_interface import (AttentionSpec, EncoderOnlyAttentionSpec, FullAttentionSpec, KVCacheConfig, @@ -92,28 +69,24 @@ from vllm.v1.kv_cache_interface import (AttentionSpec, MambaSpec, MLAAttentionSpec, UniformTypeKVCacheSpecs) from vllm.v1.outputs import (EMPTY_MODEL_RUNNER_OUTPUT, AsyncModelRunnerOutput, - DraftTokenIds, LogprobsTensors, ModelRunnerOutput, - PoolerOutput, + ModelRunnerOutput, make_empty_encoder_model_runner_output) -from vllm.v1.pool.metadata import PoolingMetadata from vllm.v1.sample.metadata import SamplingMetadata -from vllm.v1.sample.rejection_sampler import RejectionSampler from vllm.v1.spec_decode.metadata import SpecDecodeMetadata from vllm.v1.spec_decode.ngram_proposer import NgramProposer from vllm.v1.spec_decode.suffix_decoding import SuffixDecodingProposer -from vllm.v1.utils import CpuGpuBuffer -from vllm.v1.worker.ec_connector_model_runner_mixin import \ - ECConnectorModelRunnerMixin +from vllm.v1.worker.gpu_model_runner import (AsyncGPUModelRunnerOutput, + GPUModelRunner) from vllm.v1.worker.kv_connector_model_runner_mixin import KVConnectorOutput -from vllm.v1.worker.lora_model_runner_mixin import LoRAModelRunnerMixin -from vllm.v1.worker.utils import (AttentionGroup, gather_mm_placeholders, - sanity_check_mm_encoder_outputs, - scatter_mm_placeholders) +from vllm.v1.worker.utils import AttentionGroup import vllm_ascend.envs as envs_ascend from vllm_ascend.ascend_config import get_ascend_config from vllm_ascend.ascend_forward_context import (MoECommType, - set_ascend_forward_context) + get_mc2_tokens_capacity, + set_ascend_forward_context, + set_cos_and_sin, set_mc2_mask, + set_mc2_tokens_capacity) from vllm_ascend.attention.attention_mask import AttentionMaskBuilder from vllm_ascend.attention.attention_v1 import AscendAttentionState from vllm_ascend.attention.utils import (AscendCommonAttentionMetadata, @@ -149,7 +122,7 @@ from vllm_ascend.utils import (ACL_FORMAT_FRACTAL_ND, ACL_FORMAT_FRACTAL_NZ, AscendDeviceType, ProfileExecuteDuration, enable_sp, get_ascend_device_type, is_enable_nz, is_moe_model, lmhead_tp_enable) -from vllm_ascend.worker.npu_input_batch import CachedRequestState, InputBatch +from vllm_ascend.worker.npu_input_batch import InputBatch if TYPE_CHECKING: import xgrammar as xgr # type: ignore[import-untyped] @@ -206,61 +179,6 @@ def graph_capture(device: torch.device): yield graph_capture_context -# Wrapper for ModelRunnerOutput to support overlapped execution. -class AsyncNPUModelRunnerOutput(AsyncModelRunnerOutput): - - def __init__( - self, - model_runner_output: ModelRunnerOutput, - sampled_token_ids: torch.Tensor, - invalid_req_indices: list[int], - async_output_copy_stream: torch.npu.Stream, - vocab_size: int, - ): - self._model_runner_output = model_runner_output - self._invalid_req_indices = invalid_req_indices - - # Event on the copy stream so we can synchronize the non-blocking copy. - self._async_copy_ready_event = torch.npu.Event() - - # Keep a reference to the device tensor to avoid it being - # deallocated until we finish copying it to the host. - self._sampled_token_ids = sampled_token_ids - self.vocab_size = vocab_size - # Initiate the copy on a separate stream, but do not synchronize it. - default_stream = torch.npu.current_stream() - with torch.npu.stream(async_output_copy_stream): - async_output_copy_stream.wait_stream(default_stream) - self._sampled_token_ids_cpu = self._sampled_token_ids.to( - 'cpu', non_blocking=True) - self._async_copy_ready_event.record() - - def get_output(self) -> ModelRunnerOutput: - """Copy the device tensors to the host and return a ModelRunnerOutput. - - This function blocks until the copy is finished. - """ - self._async_copy_ready_event.synchronize() - - # Release the device tensor once the copy has completed - del self._sampled_token_ids - - max_gen_len = self._sampled_token_ids_cpu.shape[-1] - if max_gen_len == 1: - valid_sampled_token_ids = self._sampled_token_ids_cpu.tolist() - for i in self._invalid_req_indices: - valid_sampled_token_ids[i].clear() - else: - valid_sampled_token_ids, _ = RejectionSampler.parse_output( - self._sampled_token_ids_cpu, - self.vocab_size, - self._invalid_req_indices, - return_cu_num_tokens=False) - output = self._model_runner_output - output.sampled_token_ids = valid_sampled_token_ids - return output - - class ExecuteModelState(NamedTuple): """Ephemeral cached state transferred between execute_model() and sample_tokens(), after execute_model() returns None.""" @@ -276,54 +194,34 @@ class ExecuteModelState(NamedTuple): positions: torch.Tensor -class NPUModelRunner(LoRAModelRunnerMixin, ECConnectorModelRunnerMixin): +class NPUModelRunner(GPUModelRunner): def __init__(self, vllm_config: VllmConfig, device: torch.device): - self.vllm_config = vllm_config - self.model_config = vllm_config.model_config - self.cache_config = vllm_config.cache_config - self.compilation_config = vllm_config.compilation_config - self.load_config = vllm_config.load_config - self.lora_config = vllm_config.lora_config - self.parallel_config = vllm_config.parallel_config - self.pin_memory = is_pin_memory_available() - self.scheduler_config = vllm_config.scheduler_config - self.speculative_config = vllm_config.speculative_config - self.block_size = vllm_config.cache_config.block_size + with _torch_cuda_wrapper(): + super().__init__(vllm_config, device) + self.max_num_reqs = self.scheduler_config.max_num_seqs self.dp_size = vllm_config.parallel_config.data_parallel_size self.dp_rank = vllm_config.parallel_config.data_parallel_rank - self.dcp_size = get_dcp_group().world_size - self.dcp_rank = get_dcp_group().rank_in_group - self.pcp_size = get_pcp_group().world_size - self.pcp_rank = get_pcp_group( - ).rank_in_group if self.pcp_size > 1 else 0 - decode_max_num_seqs = getattr(self.scheduler_config, - 'decode_max_num_seqs', 0) - self.max_num_reqs = max(self.scheduler_config.max_num_seqs, - decode_max_num_seqs) + try: + self.dcp_size = get_dcp_group().world_size + self.dcp_rank = get_dcp_group().rank_in_group + self.pcp_size = get_pcp_group().world_size + self.pcp_rank = get_pcp_group( + ).rank_in_group if self.pcp_size > 1 else 0 + except Exception: + self.dcp_size = 1 + self.dcp_rank = 0 + self.pcp_size = 1 + self.pcp_rank = 0 if self.pcp_size > 1: self.model_config.max_model_len += 2 * self.pcp_size * self.max_num_reqs - self.max_num_blocks_per_req = cdiv(self.model_config.max_model_len, - self.block_size) - self.max_num_tokens = self.scheduler_config.max_num_batched_tokens - self.device = device if envs_ascend.VLLM_ASCEND_ENABLE_PREFETCH_MLP: self.prefetch_stream = torch.npu.Stream(device=device) else: self.prefetch_stream = None - self.dtype = self.model_config.dtype self.sampler = AscendSampler() - self.reorder_batch_threshold: Optional[int] = None - - # Lazy initialization, these will be set after __init__ - self.kv_caches: List[torch.Tensor] = [] - self.attn_groups: list[list[AttentionGroup]] = [] - self.encoder_cache: Dict[str, torch.Tensor] = {} self.attn_mask = None self.attn_state = None - self.requests: Dict[str, CachedRequestState] = {} - self.intermediate_tensors: Optional[IntermediateTensors] = None - self.runner_only_attn_layers: set[str] = set() # Ascend-specific configurations self.ascend_config = get_ascend_config() @@ -340,12 +238,6 @@ class NPUModelRunner(LoRAModelRunnerMixin, ECConnectorModelRunnerMixin): else: raise RuntimeError( "Dumping/debugging only works in eager mode.") - - if self.cache_config.cache_dtype == "auto": - self.kv_cache_dtype = self.dtype - else: - self.kv_cache_dtype = STR_DTYPE_TO_TORCH_DTYPE[ - self.cache_config.cache_dtype] # use_hybrid_blocks: if hybrid blocks is used. self.use_hybrid_blocks: bool = False self.need_accepted_tokens: bool = False @@ -353,15 +245,7 @@ class NPUModelRunner(LoRAModelRunnerMixin, ECConnectorModelRunnerMixin): self.is_multimodal_model = self.model_config.is_multimodal_model self.is_pooling_model = self.model_config.pooler_config is not None self.enable_prompt_embeds = self.model_config.enable_prompt_embeds - if self.is_multimodal_model or self.enable_prompt_embeds: - self.inputs_embeds = self._make_buffer( - self.max_num_tokens, - self.model_config.get_hidden_size(), - dtype=self.dtype, - numpy=False) - self.is_token_ids = self._make_buffer(self.max_num_tokens, - dtype=torch.bool) - + self.block_size = vllm_config.cache_config.block_size # Set up Attention self.use_sparse = hasattr(self.vllm_config.model_config.hf_config, "index_topk") @@ -383,92 +267,11 @@ class NPUModelRunner(LoRAModelRunnerMixin, ECConnectorModelRunnerMixin): self.is_kv_producer = vllm_config.kv_transfer_config.is_kv_producer self.is_kv_consumer = vllm_config.kv_transfer_config.is_kv_consumer - # Persistent batch. - self.input_ids = torch.zeros(self.max_num_tokens, - dtype=torch.int32, - device=self.device) - self.positions = torch.zeros(self.max_num_tokens, - dtype=torch.int64, - device=self.device) - self.query_start_loc = torch.zeros(self.max_num_reqs + 1, - dtype=torch.int32, - device=self.device) - self.seq_lens = torch.zeros(self.max_num_reqs, - dtype=torch.int32, - device=self.device) - - # NOTE: This will have some extra memory allocated, is it OK? - if self.vllm_config.model_config.use_mla: - rope_dim = self.model_config.hf_text_config.qk_rope_head_dim - self.cos = torch.ones(self.max_num_reqs * - self.decode_token_per_req, - 1, - 1, - rope_dim, - dtype=self.dtype, - device=self.device) - self.sin = torch.zeros(self.max_num_reqs * - self.decode_token_per_req, - 1, - 1, - rope_dim, - dtype=self.dtype, - device=self.device) - else: - self.cos = None - self.sin = None - - self.uses_mrope = self.model_config.uses_mrope - # Only relevant for models using M-RoPE (e.g, Qwen2-VL) - if self.uses_mrope: - # NOTE: `mrope_positions` is implemented with one additional dummy - # position on purpose to make it non-contiguous so that it can work - # with torch compile. - # See detailed explanation in https://github.com/vllm-project/vllm/pull/12128#discussion_r1926431923 - - # NOTE: When M-RoPE is enabled, position ids are 3D regardless of - # the modality of inputs. For text-only inputs, each dimension has - # identical position IDs, making M-RoPE functionally equivalent to - # 1D-RoPE. - # See page 5 of https://arxiv.org/abs/2409.12191 - self.mrope_positions = torch.zeros((3, self.max_num_tokens + 1), - dtype=torch.int64, - device=self.device) - self.mrope_positions_cpu = torch.zeros( - (3, self.max_num_tokens + 1), - dtype=torch.int64, - device="cpu", - pin_memory=True) - self.mrope_positions_np = self.mrope_positions_cpu.numpy() - - # OPTIMIZATION: Cache the tensors rather than creating them every step. - self.arange_np: npt.NDArray[np.int32] = np.arange(max( - self.max_num_reqs + 1, self.model_config.max_model_len, - self.max_num_tokens), - dtype=np.int32) - # NOTE(woosuk): These tensors are "stateless", i.e., they are literally - # a faster version of creating a new tensor every time. Thus, we should - # not make any assumptions about the values in these tensors. - self.input_ids_cpu = torch.zeros(self.max_num_tokens, - dtype=torch.int32, - device="cpu", - pin_memory=True) - self.positions_cpu = torch.zeros(self.max_num_tokens, - dtype=torch.int64, - device="cpu", - pin_memory=True) - self.positions_np = self.positions_cpu.numpy() - - self.query_start_loc_cpu = torch.zeros(self.max_num_reqs + 1, - dtype=torch.int32, - device="cpu", - pin_memory=True) - self.query_start_loc_np = self.query_start_loc_cpu.numpy() - self.seq_lens_cpu = torch.zeros(self.max_num_reqs, - dtype=torch.int32, - device="cpu", - pin_memory=True) - self.seq_lens_np = self.seq_lens_cpu.numpy() + set_cos_and_sin(vllm_config, self.max_num_reqs, + self.uniform_decode_query_len, self.dtype, self.device) + set_mc2_tokens_capacity(vllm_config, self.max_num_reqs, + self.uniform_decode_query_len) + set_mc2_mask(vllm_config, self.device) self.pcp_allgather_restore_idx = torch.zeros( self.max_num_tokens + 2 * self.pcp_size * self.max_num_reqs, dtype=torch.int32, @@ -484,27 +287,15 @@ class NPUModelRunner(LoRAModelRunnerMixin, ECConnectorModelRunnerMixin): device=self.device) self.num_actual_tokens_pcp_padded = 0 if self.speculative_config and self.pcp_size > 1: - self.input_ids_pcp_full = torch.zeros(self.max_num_tokens, - dtype=torch.int32, - device=self.device) - self.input_ids_pcp_full_cpu = torch.zeros(self.max_num_tokens, - dtype=torch.int32, - device="cpu", - pin_memory=True) - self.query_start_loc_pcp_full = torch.zeros(self.max_num_reqs + 1, - dtype=torch.int32, - device=self.device) - self.query_start_loc_pcp_full_cpu = \ - torch.zeros(self.max_num_reqs + 1, - dtype=torch.int32, - device="cpu", - pin_memory=True) - self.query_start_loc_pcp_full_np = \ - self.query_start_loc_pcp_full_cpu.numpy() + self.input_ids_pcp_full = self._make_buffer(self.max_num_tokens, + dtype=torch.int32) + self.query_start_loc_pcp_full = self._make_buffer( + self.max_num_reqs + 1, dtype=torch.int32) self.positions_pcp_full = torch.zeros(self.max_num_tokens, dtype=torch.int64, device="cpu", pin_memory=True) + self.decode_token_per_req += self.speculative_config.num_speculative_tokens self.positions_pcp_full_np = self.positions_pcp_full.numpy() self.decode_threshold = 1 + ( self.speculative_config.num_speculative_tokens @@ -512,32 +303,8 @@ class NPUModelRunner(LoRAModelRunnerMixin, ECConnectorModelRunnerMixin): self.use_aclgraph = self._use_aclgraph() - # self.aclgraph_batch_sizes sorts in ascending order. - if (self.compilation_config.cudagraph_capture_sizes and - self.compilation_config.cudagraph_mode != CUDAGraphMode.NONE): - self.aclgraph_batch_sizes = sorted( - self.compilation_config.cudagraph_capture_sizes) - - self.uniform_decode_query_len = 1 if not self.speculative_config else \ - 1 + self.speculative_config.num_speculative_tokens - # aclgraph dispatcher for runtime aclgraph dispatching. - self.aclgraph_dispatcher = CudagraphDispatcher(self.vllm_config) - # Cached outputs. - self._draft_token_ids: Optional[Union[list[list[int]], - torch.Tensor]] = None - # NOTE: we need to use `in_profile_run` to determine whether `enable_force_load_balance` is True self.in_profile_run = False - - self._init_mc2_tokens_capacity() - if is_moe_model(vllm_config): - self.reserved_mc2_mask = torch.zeros( - self.mc2_tokens_capacity, - dtype=torch.bool, - device=self.device, - ) - else: - self.reserved_mc2_mask = None self.dynamic_eplb = self.ascend_config.dynamic_eplb or self.ascend_config.expert_map_record_path if self.dynamic_eplb: EPLBParamUtils.check_dynamic_eplb(self.ascend_config.dynamic_eplb) @@ -594,6 +361,7 @@ class NPUModelRunner(LoRAModelRunnerMixin, ECConnectorModelRunnerMixin): pin_memory=self.pin_memory, vocab_size=self.model_config.get_vocab_size(), block_sizes=[self.block_size], + kernel_block_sizes=[[self.cache_config.block_size]], is_spec_decode=bool(self.vllm_config.speculative_config), logitsprocs=build_logitsprocs( self.vllm_config, self.device, self.pin_memory, @@ -603,28 +371,32 @@ class NPUModelRunner(LoRAModelRunnerMixin, ECConnectorModelRunnerMixin): num_speculative_tokens=( self.vllm_config.speculative_config.num_speculative_tokens if self.vllm_config.speculative_config else 0), - kernel_block_sizes=[[self.vllm_config.cache_config.block_size]], cp_kv_cache_interleave_size=self.parallel_config. cp_kv_cache_interleave_size, ) self.num_accepted_tokens = self._make_buffer(self.max_num_reqs, dtype=torch.int64) - self.num_decode_draft_tokens = self._make_buffer(self.max_num_reqs, - dtype=torch.int32) - # Only relevant for multimodal models - self.mm_registry = MULTIMODAL_REGISTRY - self.supports_mm_inputs = self.mm_registry.supports_multimodal_inputs( - self.model_config) - if self.supports_mm_inputs: - self.is_mm_embed = self._make_buffer(self.max_num_tokens, - dtype=torch.bool) - # TODO: EVS Support (Video tokens pruning) (see vllm#22980) - self.is_multimodal_pruning_enabled = False - - # Ephemeral state transferred between execute_model() and sample_tokens(). + self.num_draft_tokens = self._make_buffer(self.max_num_reqs, + dtype=torch.int32) + self.sampled_token_ids_pinned_cpu = torch.empty( + (self.max_num_reqs, 1), + dtype=torch.int32, + device="cpu", + pin_memory=self.pin_memory, + ) + # None in the first PP rank. The rest are set after load_model. + # the attr below is in gpu_modelrunner, but occurs lint so add them here + self.intermediate_tensors: IntermediateTensors | None = None self.execute_model_state: ExecuteModelState | None = None + self.reorder_batch_threshold: int | None = None + self.query_start_loc = self._make_buffer(self.max_num_reqs + 1, + dtype=torch.int32) - self.transfer_event = torch.npu.Event() + def _init_device_properties(self) -> None: + self.num_sms = None + + def _sync_device(self) -> None: + torch.npu.synchronize() def _set_up_drafter(self): # Set up speculative decoding. @@ -653,292 +425,9 @@ class NPUModelRunner(LoRAModelRunnerMixin, ECConnectorModelRunnerMixin): return get_spec_decode_method(self.speculative_config.method, self.vllm_config, self.device, self) - def _init_mc2_tokens_capacity(self): - # NOTE: To be clear, we need to make sure that during graph capture, the number of - # tokens is less than or equal to mc2_tokens_capacity. According to _set_cudagraph_sizes, - # the max number of tokens in graph is min(max_num_seqs * uniform_decode_query_len, 512). - if self.compilation_config.cudagraph_capture_sizes: - max_num_tokens = self.compilation_config.max_cudagraph_capture_size - else: - # NOTE: To save memory, we cap the max number of tokens to 512. - max_num_tokens = min( - self.max_num_reqs * self.uniform_decode_query_len, 512) - tp_size = self.parallel_config.tensor_parallel_size - # Use integer arithmetic for ceiling division. - num_tokens_per_tp_rank = (max_num_tokens + tp_size - 1) // tp_size - self.mc2_tokens_capacity: int = num_tokens_per_tp_rank * tp_size - - def _make_buffer(self, - *size: Union[int, torch.SymInt], - dtype: torch.dtype, - numpy: bool = True) -> CpuGpuBuffer: - # Bfloat16 torch tensors cannot be directly cast to a numpy array, so - # if a bfloat16 buffer is needed without a corresponding numpy array, - # don't bother instantiating the numpy array. - return CpuGpuBuffer(*size, - dtype=dtype, - device=self.device, - pin_memory=self.pin_memory, - with_numpy=numpy) - - def _update_states_after_model_execute( - self, output_token_ids: torch.Tensor) -> None: - """Update the cached states after model execution. - - This is used for MTP/EAGLE for hybrid models, as in linear attention, - only the last token's state is kept. In MTP/EAGLE, for draft tokens - the state are kept util we decide how many tokens are accepted for - each sequence, and a shifting is done during the next iteration - based on the number of accepted tokens. - """ - if not self.model_config.is_hybrid or not self.speculative_config: - return - - # Find the number of accepted tokens for each sequence. - num_accepted_tokens = (torch.cat( - [ - output_token_ids, - torch.full((output_token_ids.size(0), 1), - -1, - device=output_token_ids.device), - ], - dim=1) == -1).int().argmax(-1).cpu().numpy() - for i, num_tokens in enumerate(num_accepted_tokens): - self.input_batch.num_accepted_tokens_cpu[i] = num_tokens - def _use_aclgraph(self) -> bool: return self.compilation_config.cudagraph_mode != CUDAGraphMode.NONE and self.compilation_config.mode == CompilationMode.VLLM_COMPILE and not self.model_config.enforce_eager - def _update_states(self, scheduler_output: "SchedulerOutput") -> None: - # Remove finished requests from the cached states. - for req_id in scheduler_output.finished_req_ids: - self.requests.pop(req_id, None) - - # Remove the finished requests from the persistent batch. - # NOTE(woosuk): There could be an edge case where finished_req_ids and - # scheduled_req_ids overlap. This happens when a request is aborted and - # then resubmitted with the same ID. In this case, we treat them as two - # distinct requests - clearing the cached states for the first request - # and handling the second as a new request. - for req_id in scheduler_output.finished_req_ids: - self.input_batch.remove_request(req_id) - for mm_hash in scheduler_output.free_encoder_mm_hashes: - self.encoder_cache.pop(mm_hash, None) - # Remove the unscheduled requests from the persistent batch. - # NOTE(woosuk): The unscheduled requests are either preempted requests - # or running requests that are not scheduled in this step. We remove - # them from the persistent batch but keep their cached states since - # they will be scheduled again sometime in the future. - scheduled_req_ids = scheduler_output.num_scheduled_tokens.keys() - cached_req_ids = self.input_batch.req_id_to_index.keys() - unscheduled_req_ids = cached_req_ids - scheduled_req_ids - # NOTE(woosuk): The persistent batch optimization assumes that - # consecutive batches contain mostly the same requests. If batches - # have low request overlap (e.g., alternating between two distinct - # sets of requests), this optimization becomes very inefficient. - for req_id in unscheduled_req_ids: - self.input_batch.remove_request(req_id) - - req_ids_to_add: list[str] = [] - # Add new requests to the cached states. - for new_req_data in scheduler_output.scheduled_new_reqs: - req_id = new_req_data.req_id - sampling_params = new_req_data.sampling_params - pooling_params = new_req_data.pooling_params - - if sampling_params and \ - sampling_params.sampling_type == SamplingType.RANDOM_SEED: - generator = torch.Generator(device=self.device) - generator.manual_seed(sampling_params.seed) - else: - generator = None - - if pooling_params: - assert (task := pooling_params.task) is not None, ( - "You did not set `task` in the API") - model = cast(VllmModelForPooling, self.get_model()) - to_update = model.pooler.get_pooling_updates(task) - to_update.apply(pooling_params) - - backward_kwargs = {} - backward_kwargs["mm_features"] = new_req_data.mm_features - - # Create request state - PCP/DCP tracking will be computed below - self.requests[req_id] = CachedRequestState( - req_id=req_id, - prompt_token_ids=new_req_data.prompt_token_ids, - prompt_embeds=new_req_data.prompt_embeds, - sampling_params=sampling_params, - pooling_params=pooling_params, - generator=generator, - block_ids=new_req_data.block_ids, - num_computed_tokens=new_req_data.num_computed_tokens, - output_token_ids=[], - lora_request=new_req_data.lora_request, - **backward_kwargs, - ) - - # Only relevant for models using M-RoPE (e.g, Qwen2-VL) - if self.uses_mrope: - self._init_mrope_positions(self.requests[req_id]) - - req_ids_to_add.append(req_id) - - # If this rank is an EC transfer producer, - # skip updating the states of KV cache blocks. - if has_ec_transfer() and get_ec_transfer().is_producer: - return - - # Update the states of the running/resumed requests. - is_last_rank = get_pp_group().is_last_rank - req_data = scheduler_output.scheduled_cached_reqs - # wait until valid_sampled_tokens_count is copied to cpu, - # then use it to update actual num_computed_tokens of each request. - valid_sampled_token_count = self._get_valid_sampled_token_count() - for i, req_id in enumerate(req_data.req_ids): - req_state = self.requests[req_id] - num_computed_tokens = req_data.num_computed_tokens[i] - new_block_ids = req_data.new_block_ids[i] - resumed_from_preemption = req_id in req_data.resumed_req_ids - num_output_tokens = req_data.num_output_tokens[i] - req_index = self.input_batch.req_id_to_index.get(req_id) - # prev_num_draft_len is used in async scheduling mode with - # spec decode. it indicates if need to update num_computed_tokens - # of the request. for example: - # fist step: num_computed_tokens = 0, spec_tokens = [], - # prev_num_draft_len = 0. - # second step: num_computed_tokens = 100(prompt length), - # spec_tokens = [a,b], prev_num_draft_len = 0. - # third step: num_computed_tokens = 100 + 2, spec_tokens = [c,d], - # prev_num_draft_len = 2. - # num_computed_tokens in first step and second step doesn't contain - # the spec tokens length, but in third step it contains the - # spec tokens length. we only need to update num_computed_tokens - # when prev_num_draft_len > 0. - if req_state.prev_num_draft_len: - if req_index is None: - req_state.prev_num_draft_len = 0 - else: - assert self.input_batch.prev_req_id_to_index is not None - prev_req_index = self.input_batch.prev_req_id_to_index[ - req_id] - num_accepted = valid_sampled_token_count[prev_req_index] - 1 - num_rejected = req_state.prev_num_draft_len - num_accepted - num_computed_tokens -= num_rejected - req_state.output_token_ids.extend([-1] * num_accepted) - req_state.num_computed_tokens = num_computed_tokens - - if not is_last_rank: - # When using PP, the scheduler sends the sampled tokens back, - # because there's no direct communication between the first- - # stage worker and the last-stage worker. - new_token_ids = req_data.new_token_ids[i] - # Add the sampled token(s) from the previous step (if any). - # This doesn't include "unverified" tokens like spec tokens. - num_new_tokens = (num_computed_tokens + len(new_token_ids) - - req_state.num_tokens) - if num_new_tokens == 1: - # Avoid slicing list in most common case. - req_state.output_token_ids.append(new_token_ids[-1]) - elif num_new_tokens > 0: - req_state.output_token_ids.extend( - new_token_ids[-num_new_tokens:]) - - # Update the block IDs. - if not resumed_from_preemption: - if new_block_ids is not None: - # Append the new blocks to the existing block IDs. - for block_ids, new_ids in zip(req_state.block_ids, - new_block_ids): - block_ids.extend(new_ids) - else: - assert new_block_ids is not None - # The request is resumed from preemption. - # Replace the existing block IDs with the new ones. - req_state.block_ids = new_block_ids - if req_index is None: - # The request is not in the persistent batch. - # The request was either preempted and resumed later, or was - # not scheduled in the previous step and needs to be added - # again. - - if self.use_async_scheduling and num_output_tokens > 0: - # We must recover the output token ids for resumed requests - # in the async scheduling case, so that correct input_ids - # are obtained. - resumed_token_ids = req_data.all_token_ids[req_id] - req_state.output_token_ids = resumed_token_ids[ - -num_output_tokens:] - - req_ids_to_add.append(req_id) - continue - - # Update the persistent batch. - self.input_batch.num_computed_tokens_cpu[req_index] = ( - num_computed_tokens) - if new_block_ids is not None: - self.input_batch.block_table.append_row( - new_block_ids, req_index) - - # For the last rank, we don't need to update the token_ids_cpu - # because the sampled tokens are already cached. - if not is_last_rank: - # Add new_token_ids to token_ids_cpu. - start_token_index = num_computed_tokens - end_token_index = num_computed_tokens + len(new_token_ids) - self.input_batch.token_ids_cpu[ - req_index, - start_token_index:end_token_index] = new_token_ids - self.input_batch.num_tokens_no_spec[ - req_index] = end_token_index - self.input_batch.num_tokens[req_index] = end_token_index - - # Add spec_token_ids to token_ids_cpu. - spec_token_ids = ( - scheduler_output.scheduled_spec_decode_tokens.get(req_id, ())) - num_spec_tokens = len(spec_token_ids) - if self.use_async_scheduling: - req_state.prev_num_draft_len = num_spec_tokens - if num_spec_tokens: - start_index = self.input_batch.num_tokens_no_spec[req_index] - end_token_index = start_index + num_spec_tokens - self.input_batch.token_ids_cpu[ - req_index, start_index:end_token_index] = spec_token_ids - # NOTE(woosuk): `num_tokens` here may include spec tokens. - self.input_batch.num_tokens[req_index] += num_spec_tokens - - # Add the new or resumed requests to the persistent batch. - # The smaller empty indices are filled first. - for req_id in req_ids_to_add: - req_state = self.requests[req_id] - self.input_batch.add_request(req_state) - - # Condense the batched states if there are gaps left by removed requests - self.input_batch.condense() - # Allow attention backend to reorder the batch, potentially - self._may_reorder_batch(scheduler_output) - # Refresh batch metadata with any pending updates. - self.input_batch.refresh_metadata() - - def _get_valid_sampled_token_count(self) -> list[int]: - # Wait until valid_sampled_tokens_count is copied to cpu, - prev_sampled_token_ids = self.input_batch.prev_sampled_token_ids - if (self.valid_sampled_token_count_event is None - or prev_sampled_token_ids is None): - return [] - - counts_cpu = self.valid_sampled_token_count_cpu - self.valid_sampled_token_count_event.synchronize() - return counts_cpu[:prev_sampled_token_ids.shape[0]].tolist() - - def _init_mrope_positions(self, req_state: CachedRequestState): - assert supports_mrope(self.model), "MROPE is not supported" - req_state.mrope_positions, req_state.mrope_position_delta = \ - self.model.get_mrope_input_positions( - req_state.prompt_token_ids, - req_state.mm_features, - ) - def _sync_metadata_across_dp( self, num_tokens: int, with_prefill: bool) -> tuple[int, Optional[torch.Tensor], bool]: @@ -986,31 +475,6 @@ class NPUModelRunner(LoRAModelRunnerMixin, ECConnectorModelRunnerMixin): return self.model.unwrap() return self.model - def get_supported_generation_tasks(self) -> "list[GenerationTask]": - model = self.get_model() - supported_tasks = list[GenerationTask]() - - if is_text_generation_model(model): - supported_tasks.append("generate") - - if supports_transcription(model): - if model.supports_transcription_only: - return ["transcription"] - - supported_tasks.append("transcription") - - return supported_tasks - - def get_supported_tasks(self) -> "tuple[SupportedTask, ...]": - tasks = list[SupportedTask]() - - if self.model_config.runner_type == "generate": - tasks.extend(self.get_supported_generation_tasks()) - if self.model_config.runner_type == "pooling": - tasks.extend(self.get_supported_pooling_tasks()) - - return tuple(tasks) - def _make_attention_mask(self, attn_state) -> torch.Tensor: # pcp situation. if self.attn_mask_builder is None: @@ -1027,370 +491,6 @@ class NPUModelRunner(LoRAModelRunnerMixin, ECConnectorModelRunnerMixin): return self.attn_mask_builder.get_mla_mask(self.dtype) return self.attn_mask_builder.get_splitfuse_attn_mask() - def _calc_mrope_positions(self, scheduler_output: "SchedulerOutput"): - mrope_pos_ptr = 0 - for index, req_id in enumerate(self.input_batch.req_ids): - req = self.requests[req_id] - assert req.mrope_positions is not None - - num_computed_tokens = \ - self.input_batch.num_computed_tokens_cpu[index] - num_scheduled_tokens = \ - scheduler_output.num_scheduled_tokens[req_id] - num_prompt_tokens = length_from_prompt_token_ids_or_embeds( - req.prompt_token_ids, req.prompt_embeds) - - if num_computed_tokens + num_scheduled_tokens > num_prompt_tokens: - prompt_part_len = max(0, - num_prompt_tokens - num_computed_tokens) - completion_part_len = max( - 0, num_scheduled_tokens - prompt_part_len) - else: - prompt_part_len = num_scheduled_tokens - completion_part_len = 0 - - assert num_scheduled_tokens == prompt_part_len + completion_part_len - - if prompt_part_len > 0: - # prompt's mrope_positions are pre-computed - dst_start = mrope_pos_ptr - dst_end = mrope_pos_ptr + prompt_part_len - src_start = num_computed_tokens - src_end = num_computed_tokens + prompt_part_len - - self.mrope_positions_cpu[:, dst_start:dst_end] = \ - req.mrope_positions[:, src_start:src_end] - - mrope_pos_ptr += prompt_part_len - - if completion_part_len > 0: - # compute completion's mrope_positions on-the-fly - dst_start = mrope_pos_ptr - dst_end = mrope_pos_ptr + completion_part_len - MRotaryEmbedding.get_next_input_positions_tensor( - out=self.mrope_positions_np, - out_offset=dst_start, - mrope_position_delta=req.mrope_position_delta, - context_len=num_computed_tokens + prompt_part_len, - num_new_tokens=completion_part_len, - ) - - mrope_pos_ptr += completion_part_len - - def _execute_mm_encoder(self, scheduler_output: "SchedulerOutput"): - scheduled_encoder_inputs = scheduler_output.scheduled_encoder_inputs - if not scheduled_encoder_inputs: - return - - # Batch the multi-modal inputs. - mm_kwargs, mm_hashes_pos = self._batch_mm_kwargs_from_scheduler( - scheduler_output) - encoder_outputs = [] - model = cast(SupportsMultiModal, self.model) - mm_inputs = group_mm_kwargs_by_modality( - mm_kwargs, - device=self.device, - pin_memory=self.pin_memory, - merge_by_field_config=model.merge_by_field_config, - ) - for modality, num_items, mm_kwargs_group in mm_inputs: - # Run the encoder. - # `curr_group_outputs` is either of the following: - # 1. A tensor of shape (num_items, feature_size, hidden_size) - # in case feature_size is fixed across all multimodal items. - # 2. A list or tuple (length: num_items) of tensors, each of shape - # (feature_size, hidden_size) in case the feature size is dynamic - # depending on the input multimodal items. - curr_group_outputs = self.model.embed_multimodal(**mm_kwargs_group) - - sanity_check_mm_encoder_outputs( - curr_group_outputs, - expected_num_items=num_items, - ) - - for output in curr_group_outputs: - encoder_outputs.append(output) - - for (mm_hash, pos_info), output in zip(mm_hashes_pos, encoder_outputs): - self.encoder_cache[mm_hash] = scatter_mm_placeholders( - output, - is_embed=pos_info.is_embed, - ) - self.maybe_save_ec_to_connector(self.encoder_cache, mm_hash) - - def _batch_mm_kwargs_from_scheduler( - self, - scheduler_output: "SchedulerOutput", - ) -> tuple[list[MultiModalKwargsItem], list[tuple[str, PlaceholderRange]]]: - """Batch multimodal kwargs from scheduled encoder inputs. - - Args: - scheduler_output: The scheduler output containing scheduled encoder - inputs. - - Returns: - A tuple of (mm_kwargs, req_ids_pos) where: - - mm_kwargs: List of multimodal kwargs items to be batched - - mm_hashes_pos: List of (mm_hash, position_info) tuples - """ - scheduled_encoder_inputs = scheduler_output.scheduled_encoder_inputs - if not scheduled_encoder_inputs: - return [], [] - # Batch the multi-modal inputs. - mm_kwargs = list[MultiModalKwargsItem]() - # list of tuple (mm_hash, position_info) - mm_hashes_pos = list[tuple[str, PlaceholderRange]]() - for req_id, encoder_input_ids in scheduled_encoder_inputs.items(): - req_state = self.requests[req_id] - assert req_state.mm_features is not None - for mm_input_id in encoder_input_ids: - mm_feature = req_state.mm_features[mm_input_id] - mm_hash = mm_feature.identifier - mm_kwargs.append(mm_feature.data) - mm_hashes_pos.append((mm_hash, mm_feature.mm_position)) - - return mm_kwargs, mm_hashes_pos - - def _gather_mm_embeddings( - self, - scheduler_output: "SchedulerOutput", - shift_computed_tokens: int = 0, - ) -> tuple[list[torch.Tensor], torch.Tensor]: - total_num_scheduled_tokens = scheduler_output.total_num_scheduled_tokens - - mm_embeds = list[torch.Tensor]() - is_mm_embed = self.is_mm_embed.cpu - is_mm_embed[:total_num_scheduled_tokens] = False - - req_start_idx = 0 - - for req_id in self.input_batch.req_ids: - mm_embeds_req: list[torch.Tensor] = [] - - num_scheduled_tokens = scheduler_output.num_scheduled_tokens[ - req_id] - req_state = self.requests[req_id] - num_computed_tokens = \ - req_state.num_computed_tokens + shift_computed_tokens - - for mm_feature in req_state.mm_features: # type: ignore - pos_info = mm_feature.mm_position - start_pos = pos_info.offset - num_encoder_tokens = pos_info.length - - # The encoder output is needed if the two ranges overlap: - # [num_computed_tokens, - # num_computed_tokens + num_scheduled_tokens) and - # [start_pos, start_pos + num_encoder_tokens) - if start_pos >= num_computed_tokens + num_scheduled_tokens: - # The encoder output is not needed in this step. - break - if start_pos + num_encoder_tokens <= num_computed_tokens: - # The encoder output is already processed and stored - # in the decoder's KV cache. - continue - - start_idx = max(num_computed_tokens - start_pos, 0) - end_idx = min( - num_computed_tokens - start_pos + num_scheduled_tokens, - num_encoder_tokens, - ) - assert start_idx < end_idx - - mm_hash = mm_feature.identifier - encoder_output = self.encoder_cache.get(mm_hash, None) - assert encoder_output is not None,\ - f"Encoder cache miss for {mm_hash}." - - if (is_embed := pos_info.is_embed) is not None: - is_embed = is_embed[start_idx:end_idx] - - req_start_pos = req_start_idx + start_pos - num_computed_tokens - is_mm_embed[req_start_pos+start_idx:req_start_pos + end_idx] \ - = True if is_embed is None else is_embed - - mm_embeds_item = gather_mm_placeholders( - encoder_output[start_idx:end_idx], - is_embed=is_embed, - ) - mm_embeds_req.append(mm_embeds_item) - - mm_embeds.extend(mm_embeds_req) - req_start_idx += num_scheduled_tokens - - is_mm_embed = self.is_mm_embed.copy_to_gpu(total_num_scheduled_tokens) - - return mm_embeds, is_mm_embed - - def _get_cumsum_and_arange( - self, - num_tokens: np.ndarray, - cumsum_dtype: Optional[np.dtype] = None, - ) -> tuple[np.ndarray, np.ndarray]: - """Get the cumulative sum and batched arange of the given array. - # E.g., [2, 5, 3] -> ([2, 7, 10], [0, 1, 0, 1, 2, 3, 4, 0, 1, 2]) - # Equivalent to but faster than: - # np.concatenate([np.arange(n) for n in num_tokens]) - """ - # Step 1. [2, 5, 3] -> [2, 7, 10] - cu_num_tokens = np.cumsum(num_tokens, dtype=cumsum_dtype) - total_num_tokens = cu_num_tokens[-1] - # Step 2. [2, 7, 10] -> [0, 0, 2, 2, 2, 2, 2, 7, 7, 7] - cumsums_offsets = np.repeat(cu_num_tokens - num_tokens, num_tokens) - # Step 3. [0, 1, 0, 1, 2, 3, 4, 0, 1, 2] - arange = self.arange_np[:total_num_tokens] - cumsums_offsets - - return cu_num_tokens, arange - - def _prepare_input_ids(self, scheduler_output: "SchedulerOutput", - total_num_scheduled_tokens: int, - cu_num_tokens: np.ndarray) -> None: - """Prepare the input IDs for the current batch. - - Carefully handles the `prev_sampled_token_ids` which can be cached - from the previous engine iteration, in which case those tokens on the - NPU need to be copied into the corresponding slots into input_ids.""" - - if self.input_batch.prev_sampled_token_ids is None: - # Normal scheduling case - self.input_ids[:total_num_scheduled_tokens].copy_( - self.input_ids_cpu[:total_num_scheduled_tokens], - non_blocking=True) - if self.is_multimodal_model or self.enable_prompt_embeds: - self.inputs_embeds.copy_to_gpu(total_num_scheduled_tokens) - self.is_token_ids.copy_to_gpu(total_num_scheduled_tokens) - return - - # Async scheduling case, where some decode requests from the previous - # iteration won't have entries in input_ids_cpu and need to be copied - # on the NPU from prev_sampled_token_ids. - prev_req_id_to_index = self.input_batch.prev_req_id_to_index - assert prev_req_id_to_index is not None - sample_flattened_indices: list[int] = [] - spec_flattened_indices: list[int] = [] - prev_common_req_indices: list[int] = [] - prev_draft_token_indices: list[int] = [] - indices_match = True - max_flattened_index = -1 - total_num_spec_tokens = 0 - scheduled_spec_tokens = scheduler_output.scheduled_spec_decode_tokens - for req_id, cur_index in self.input_batch.req_id_to_index.items(): - if (prev_index := prev_req_id_to_index.get(req_id)) is not None: - prev_common_req_indices.append(prev_index) - # We need to compute the flattened input_ids index of the - # last token in each common request. - draft_len = len(scheduled_spec_tokens.get(req_id, ())) - total_num_spec_tokens += draft_len - flattened_index = cu_num_tokens[cur_index].item() - 1 - # example: cu_num_tokens = [2, 5, 8], draft_tokens = [1, 2, 2] - # sample_flattened_indices = [0, 2, 5] - # spec_flattened_indices = [1, 3, 4, 6, 7] - sample_flattened_indices.append(flattened_index - draft_len) - spec_flattened_indices.extend( - range(flattened_index - draft_len + 1, - flattened_index + 1)) - start = prev_index * self.num_spec_tokens - # prev_draft_token_indices is used to find which draft_tokens_id - # should be copied to input_ids - # example: prev draft_tokens_id [[1,2], [3,4], [5, 6]] - # flatten draft_tokens_id [1,2,3,4,5,6] - # draft_len of each request [1, 2, 1] - # then prev_draft_token_indices is [0, 2, 3, 4] - prev_draft_token_indices.extend(range(start, - start + draft_len)) - indices_match &= prev_index == flattened_index - max_flattened_index = max(max_flattened_index, flattened_index) - num_commmon_tokens = len(sample_flattened_indices) - total_without_spec = (total_num_scheduled_tokens - - total_num_spec_tokens) - if num_commmon_tokens < total_without_spec: - # If not all requests are decodes from the last iteration, - # We need to copy the input_ids_cpu to the NPU first. - self.input_ids[:total_num_scheduled_tokens].copy_( - self.input_ids_cpu[:total_num_scheduled_tokens], - non_blocking=True) - if self.is_multimodal_model or self.enable_prompt_embeds: - self.inputs_embeds.copy_to_gpu(total_num_scheduled_tokens) - self.is_token_ids.copy_to_gpu(total_num_scheduled_tokens) - if num_commmon_tokens == 0: - # No requests in common with the previous iteration - # So input_ids_cpu will have all the input ids. - return - if indices_match and max_flattened_index == (num_commmon_tokens - 1): - # Common-case optimization: the batch is unchanged - # and no reordering happened. - # The indices are both the same permutation of 0..N-1 so - # we can copy directly using a single slice. - self.input_ids[:num_commmon_tokens].copy_( - self.input_batch.prev_sampled_token_ids[:num_commmon_tokens, - 0], - non_blocking=True) - self.is_token_ids.gpu[:num_commmon_tokens] = True - return - # Upload the index tensors asynchronously so the scatter can be non-blocking. - sampled_tokens_index_tensor = torch.tensor( - sample_flattened_indices, - dtype=torch.int64, - pin_memory=self.pin_memory).to(self.device, non_blocking=True) - prev_common_req_indices_tensor = torch.tensor( - prev_common_req_indices, - dtype=torch.int64, - pin_memory=self.pin_memory).to(self.device, non_blocking=True) - self.input_ids.scatter_( - dim=0, - index=sampled_tokens_index_tensor, - src=self.input_batch.prev_sampled_token_ids[ - prev_common_req_indices_tensor, 0], - ) - - # scatter the draft tokens after the sampled tokens are scattered. - if self._draft_token_ids is None or not spec_flattened_indices: - return - - assert isinstance(self._draft_token_ids, torch.Tensor) - draft_tokens_index_tensor = torch.tensor( - spec_flattened_indices, - dtype=torch.int64, - pin_memory=self.pin_memory).to(self.device, non_blocking=True) - prev_draft_token_indices_tensor = torch.tensor( - prev_draft_token_indices, - dtype=torch.int64, - pin_memory=self.pin_memory).to(self.device, non_blocking=True) - - # because input_ids dtype is torch.int32, - # so convert draft_token_ids to torch.int32 here. - draft_token_ids = self._draft_token_ids.to(dtype=torch.int32) - self._draft_token_ids = None - self.input_ids.scatter_( - dim=0, - index=draft_tokens_index_tensor, - src=draft_token_ids.flatten()[prev_draft_token_indices_tensor], - ) - - def _may_reorder_batch(self, scheduler_output: "SchedulerOutput") -> None: - """ - Update the order of requests in the batch based on the attention - backend's needs. For example, some attention backends (namely MLA) may - want to separate requests based on if the attention computation will be - compute-bound or memory-bound. - - Args: - scheduler_output: The scheduler output. - """ - # Attention free models have zero kv_cache_goups, however models - # like Mamba are also attention free but use the kv_cache for - # keeping its internal state. This is why we check the number - # of kv_cache groups instead of solely checking - # for self.model_config.is_attention_free. - if len(self.kv_cache_config.kv_cache_groups) == 0: - return - - if self.reorder_batch_threshold is not None: - reorder_batch_to_split_decodes_and_prefills( - self.input_batch, - scheduler_output, - decode_threshold=self.reorder_batch_threshold) - def generate_kv_idx(self, scheduler_output): if not self.pcp_size > 1: return @@ -1488,7 +588,7 @@ class NPUModelRunner(LoRAModelRunnerMixin, ECConnectorModelRunnerMixin): dtype=np.int32) if (self.use_aclgraph and total_num_scheduled_tokens - <= self.aclgraph_batch_sizes[-1]): + <= self.cudagraph_batch_sizes[-1]): # Add padding to the batch size. num_input_tokens = self.vllm_config.pad_for_cudagraph( total_num_scheduled_tokens) @@ -1542,12 +642,12 @@ class NPUModelRunner(LoRAModelRunnerMixin, ECConnectorModelRunnerMixin): num_scheduled_tokens) if self.pcp_size > 1: - positions_np = self.positions_np[:total_num_scheduled_tokens] + positions_np = self.positions.np[:total_num_scheduled_tokens] np.add(self.input_batch.num_computed_tokens_cpu[req_indices], position_pcp[:total_num_scheduled_tokens], out=positions_np) else: - self.positions_np[:total_num_scheduled_tokens] = positions_np + self.positions.np[:total_num_scheduled_tokens] = positions_np # Calculate M-RoPE positions. # Only relevant for models using M-RoPE (e.g, Qwen2-VL) @@ -1555,8 +655,8 @@ class NPUModelRunner(LoRAModelRunnerMixin, ECConnectorModelRunnerMixin): self._calc_mrope_positions(scheduler_output) # Only relevant for models using M-RoPE (e.g, Qwen2-VL) - self.mrope_positions[:, :total_num_scheduled_tokens].copy_( - self.mrope_positions_cpu[:, :total_num_scheduled_tokens], + self.mrope_positions.gpu[:, :total_num_scheduled_tokens].copy_( + self.mrope_positions.cpu[:, :total_num_scheduled_tokens], non_blocking=True) # Get token indices. @@ -1573,13 +673,14 @@ class NPUModelRunner(LoRAModelRunnerMixin, ECConnectorModelRunnerMixin): torch.index_select(self.input_batch.token_ids_cpu_tensor.flatten(), 0, token_indices_tensor, - out=self.input_ids_cpu[:total_num_scheduled_tokens]) - is_token_ids = self.input_batch.is_token_ids.flatten() - torch.index_select( - is_token_ids, - 0, - token_indices_tensor, - out=self.is_token_ids.cpu[:total_num_scheduled_tokens]) + out=self.input_ids.cpu[:total_num_scheduled_tokens]) + if self.enable_prompt_embeds: + is_token_ids = self.input_batch.is_token_ids_tensor.flatten() + torch.index_select( + is_token_ids, + 0, + token_indices_tensor, + out=self.is_token_ids.cpu[:total_num_scheduled_tokens]) # Because we did not pre-allocate a massive prompt_embeds CPU tensor on # the InputBatch, we need to fill in the prompt embeds into the expected @@ -1621,29 +722,26 @@ class NPUModelRunner(LoRAModelRunnerMixin, ECConnectorModelRunnerMixin): output_idx += num_sched - self.query_start_loc_np[0] = 0 - self.query_start_loc_np[1:num_reqs + 1] = cu_num_tokens - self.query_start_loc[:num_reqs + 1].copy_( - self.query_start_loc_cpu[:num_reqs + 1], non_blocking=True) + self.query_start_loc.np[0] = 0 + self.query_start_loc.np[1:num_reqs + 1] = cu_num_tokens + self.query_start_loc.copy_to_gpu() - self.seq_lens_np[:num_reqs] = ( + self.seq_lens.np[:num_reqs] = ( self.input_batch.num_computed_tokens_cpu[:num_reqs] + num_scheduled_tokens) - self.seq_lens[:num_reqs].copy_(self.seq_lens_cpu[:num_reqs], - non_blocking=True) + self.seq_lens.copy_to_gpu() # Fill unused with -1. Needed for reshape_and_cache - self.query_start_loc[num_reqs + 1:].fill_(-1) - self.seq_lens[num_reqs:].fill_(0) + self.query_start_loc.gpu[num_reqs + 1:].fill_(-1) + self.seq_lens.gpu[num_reqs:].fill_(0) self.query_lens = torch.from_numpy(num_scheduled_tokens) # Copy the tensors to the NPU. self._prepare_input_ids(scheduler_output, total_num_scheduled_tokens, cu_num_tokens) - self.positions_cpu[total_num_scheduled_tokens:num_input_tokens].zero_() - self.positions[:num_input_tokens].copy_( - self.positions_cpu[:num_input_tokens], non_blocking=True) + self.positions.cpu[total_num_scheduled_tokens:num_input_tokens].zero_() + self.positions.copy_to_gpu() attn_state = self._build_attn_state(num_reqs, num_scheduled_tokens, num_valid_tokens) @@ -1672,7 +770,7 @@ class NPUModelRunner(LoRAModelRunnerMixin, ECConnectorModelRunnerMixin): np.array(tokens_original, dtype=np.int32)) discard_requests_mask = original_seq_lens_np < num_tokens_np else: - discard_requests_mask = self.seq_lens_np[:num_reqs] < num_tokens_np + discard_requests_mask = self.seq_lens.np[:num_reqs] < num_tokens_np discard_request_indices = np.nonzero(discard_requests_mask)[0] self.num_discarded_requests = len(discard_request_indices) @@ -1693,7 +791,7 @@ class NPUModelRunner(LoRAModelRunnerMixin, ECConnectorModelRunnerMixin): # NOTE(woosuk): To unify token ids and soft tokens (vision # embeddings), we always use embeddings (rather than token ids) # as input to the multimodal model, even when the input is text. - input_ids = self.input_ids[:total_num_scheduled_tokens] + input_ids = self.input_ids.gpu[:total_num_scheduled_tokens] mm_embeds, is_mm_embed = self._gather_mm_embeddings( scheduler_output) @@ -1726,7 +824,7 @@ class NPUModelRunner(LoRAModelRunnerMixin, ECConnectorModelRunnerMixin): .squeeze(1) # Some tokens ids may need to become embeds if token_ids_idx.numel() > 0: - token_ids = self.input_ids[token_ids_idx] + token_ids = self.input_ids.gpu[token_ids_idx] tokens_to_embeds = self.model.embed_input_ids( input_ids=token_ids) self.inputs_embeds.gpu[token_ids_idx] = tokens_to_embeds @@ -1738,13 +836,13 @@ class NPUModelRunner(LoRAModelRunnerMixin, ECConnectorModelRunnerMixin): # While it is possible to use embeddings as input just like the # multimodal models, it is not desirable for performance since # then the embedding layer is not included in the ACL graph. - input_ids = self.input_ids[:num_input_tokens] + input_ids = self.input_ids.gpu[:num_input_tokens] inputs_embeds = None - - positions = self.positions[:num_input_tokens] + positions = self.positions.gpu[:num_input_tokens] if self.uses_mrope: - positions = self.mrope_positions[:, :num_input_tokens] + positions = self.mrope_positions.gpu[:, :num_input_tokens] + # type: ignore if get_pp_group().is_first_rank: intermediate_tensors = None else: @@ -1785,7 +883,7 @@ class NPUModelRunner(LoRAModelRunnerMixin, ECConnectorModelRunnerMixin): logits_indices = logits_indices.pin_memory().to( self.device, non_blocking=True) else: - logits_indices = self.query_start_loc[1:num_reqs + 1] - 1 + logits_indices = self.query_start_loc.gpu[1:num_reqs + 1] - 1 else: # Get the number of draft tokens for each request. # Iterate over the dictionary rather than all requests since not all @@ -1858,12 +956,12 @@ class NPUModelRunner(LoRAModelRunnerMixin, ECConnectorModelRunnerMixin): else: blk_table = self.input_batch.block_table[kv_cache_group_id] blk_table_tensor = blk_table.get_device_tensor() - blk_table.slot_mapping[slot_mapping_size:].fill_(0) + blk_table.slot_mapping.gpu[slot_mapping_size:].fill_(0) if self.pcp_size > 1: - slot_mapping_for_pcp = blk_table.slot_mapping[: - long_seq_metadata - . - num_actual_tokens_pcp_padded] + slot_mapping_for_pcp = blk_table.slot_mapping.gpu[: + long_seq_metadata + . + num_actual_tokens_pcp_padded] slot_mapping_for_pcp[slot_mapping_size:].fill_(-1) assert pcp_unpad_mask is not None pcp_padded_slot_mapping = self.pcp_padded_slot_mapping[: @@ -1877,9 +975,9 @@ class NPUModelRunner(LoRAModelRunnerMixin, ECConnectorModelRunnerMixin): slot_mapping_size] slot_mapping_for_pcp[:long_seq_metadata. num_actual_tokens_pcp_padded] = pcp_padded_slot_mapping - blk_table.slot_mapping[:long_seq_metadata.num_actual_tokens_pcp_padded] = \ + blk_table.slot_mapping.gpu[:long_seq_metadata.num_actual_tokens_pcp_padded] = \ slot_mapping_for_pcp - slot_mapping = blk_table.slot_mapping + slot_mapping = blk_table.slot_mapping.gpu # NOTE: This is a temporary hack, now in GPUModelRunner, this prepare_inputs # has been split to multiple parts, and there are 3 parts that is related to this @@ -1898,27 +996,27 @@ class NPUModelRunner(LoRAModelRunnerMixin, ECConnectorModelRunnerMixin): num_reqs_padded = num_input_tokens // self.uniform_decode_query_len pad_size = num_reqs_padded - num_reqs if pad_size > 0: - last_query_loc = self.query_start_loc[num_reqs] + last_query_loc = self.query_start_loc.gpu[num_reqs] steps = torch.arange(1, pad_size + 1, device=self.device, - dtype=self.query_start_loc.dtype) + dtype=self.query_start_loc.gpu.dtype) fill_values = last_query_loc + ( steps * self.uniform_decode_query_len) - self.query_start_loc[num_reqs + 1:num_reqs_padded + - 1] = fill_values + self.query_start_loc.gpu[num_reqs + 1:num_reqs_padded + + 1] = fill_values # So we are trying to simulate the behavior of GPUModelRunner's # prepare_inputs for uniform decode mode by padding query_start_loc num_reqs = num_reqs_padded # Make AscendCommonAttentionMetadata common_attn_metadata = AscendCommonAttentionMetadata( - query_start_loc=self.query_start_loc[:num_reqs + 1], - query_start_loc_cpu=self.query_start_loc_cpu[:num_reqs + 1], - seq_lens_cpu=self.seq_lens_cpu[:num_reqs], - seq_lens=self.seq_lens[:num_reqs], + query_start_loc=self.query_start_loc.gpu[:num_reqs + 1], + query_start_loc_cpu=self.query_start_loc.cpu[:num_reqs + 1], + seq_lens_cpu=self.seq_lens.cpu[:num_reqs], + seq_lens=self.seq_lens.gpu[:num_reqs], num_reqs=num_reqs, num_actual_tokens=slot_mapping_size, num_input_tokens=num_input_tokens, @@ -1927,15 +1025,13 @@ class NPUModelRunner(LoRAModelRunnerMixin, ECConnectorModelRunnerMixin): block_table_tensor=blk_table_tensor[:num_reqs], slot_mapping=slot_mapping, num_computed_tokens_cpu=num_computed_tokens_cpu, - positions=self.positions, + positions=self.positions.gpu, attn_mask=self.attn_mask, spec_attn_mask=self.spec_attn_mask, attn_state=self.attn_state, is_only_prefill=bool(np.all(num_valid_tokens != 1)), max_query_len=max_num_scheduled_tokens, decode_token_per_req=self.decode_token_per_req, - cos=self.cos, - sin=self.sin, prefill_context_parallel_metadata=long_seq_metadata, ) @@ -1947,8 +1043,8 @@ class NPUModelRunner(LoRAModelRunnerMixin, ECConnectorModelRunnerMixin): # (num_reqs_d + num_reqs_p, max_num_blocks), # flattened block_table: [d0, d0, d1, d1, p0, p1, p2] # (num_reqs_d * decode_threshold + num_reqs_p, max_num_blocks), - ori_query_lens = self.query_start_loc_pcp_full_cpu[1:num_reqs+1] - \ - self.query_start_loc_pcp_full_cpu[:num_reqs] + ori_query_lens = self.query_start_loc_pcp_full.cpu[1:num_reqs+1] - \ + self.query_start_loc_pcp_full.cpu[:num_reqs] num_prefill_reqs = (ori_query_lens > self.decode_threshold).sum().item() num_decode_reqs = num_reqs - num_prefill_reqs @@ -2035,7 +1131,7 @@ class NPUModelRunner(LoRAModelRunnerMixin, ECConnectorModelRunnerMixin): if len(token_type_id_requests) == 0: return model_kwargs - seq_lens = self.seq_lens[:num_reqs] + seq_lens = self.seq_lens.gpu[:num_reqs] token_type_ids = [] for i in range(num_reqs): @@ -2107,7 +1203,7 @@ class NPUModelRunner(LoRAModelRunnerMixin, ECConnectorModelRunnerMixin): attn_state = AscendAttentionState.PrefillNoCache else: attn_state = AscendAttentionState.PrefillCacheHit - elif np.array_equal(self.seq_lens_np[:num_reqs], num_scheduled_tokens): + elif np.array_equal(self.seq_lens.np[:num_reqs], num_scheduled_tokens): attn_state = AscendAttentionState.PrefillNoCache # We assume it is the decode stage, where prefill occurs but only one token is not hit in cache. elif np.all(num_scheduled_tokens == 1): @@ -2210,7 +1306,7 @@ class NPUModelRunner(LoRAModelRunnerMixin, ECConnectorModelRunnerMixin): # Compute the draft token ids. # draft_token_indices: [ 1, 2, 3, 105, 106, 208] - draft_token_ids = self.input_ids[logits_indices] + draft_token_ids = self.input_ids.gpu[logits_indices] draft_token_ids = draft_token_ids[target_logits_indices + 1] if self.pcp_size > 1: logits_indices = logits_indices_pcp @@ -2225,6 +1321,264 @@ class NPUModelRunner(LoRAModelRunnerMixin, ECConnectorModelRunnerMixin): ) return metadata + def propose_draft_token_ids( + self, + valid_sampled_token_ids: torch.Tensor | list[list[int]], + sampling_metadata: SamplingMetadata, + scheduler_output: "SchedulerOutput", + spec_decode_metadata: SpecDecodeMetadata, + positions: torch.Tensor, + num_scheduled_tokens: int, + hidden_states: torch.Tensor, + attn_metadata: dict[str, Any], + aux_hidden_states: torch.Tensor = None, + ) -> Optional[list[list[int]]]: + if not self.drafter: + # Speculative decoding is not enabled. + draft_token_ids = None + else: + draft_token_ids = self.drafter.generate_token_ids( + valid_sampled_token_ids, sampling_metadata, scheduler_output, + spec_decode_metadata, positions, num_scheduled_tokens, + hidden_states, attn_metadata, aux_hidden_states) + return draft_token_ids + + def _select_moe_comm_method(self, + num_tokens: int) -> Optional[MoECommType]: + """1. If expert parallel is not enabled, we use all-gather since MC2 and all-to-all + are designed for expert parallelism. + 2. If expert parallel is enabled, we need to consider the soc version and the + number of tokens. This is based on the observation that all-gather is more + efficient than all-to-all when running on A2. + + a. For A2, we choose from MC2 and all-gather. + + b. For A3, we choose from MC2 and all-to-all. + + In both cases, we use MC2 when the number of tokens is smaller than + a its capacity threshold. + + Args: + num_tokens (int): The number of tokens in the current batch. + + Raises: + ValueError: If the soc version is unsupported. + + Returns: + MoECommType: The selected MoE communication method. + """ + if not is_moe_model(self.vllm_config): + return None + mc2_tokens_capacity = get_mc2_tokens_capacity() + soc_version = get_ascend_device_type() + quant_type = getattr( + self.vllm_config.model_config.hf_config, 'moe_quantize', + getattr(self.vllm_config.model_config.hf_config, 'quantize', None)) + model_type = self.vllm_config.model_config.hf_config.model_type + + if not self.parallel_config.enable_expert_parallel: + moe_comm_type = MoECommType.ALLGATHER + elif soc_version in {AscendDeviceType._910B}: + if (num_tokens <= mc2_tokens_capacity + and self.parallel_config.world_size_across_dp / + self.parallel_config.pipeline_parallel_size >= 16): + moe_comm_type = MoECommType.MC2 + else: + # Currently, w4a8_dynamic does not support allgatherep + if quant_type == "w4a8_dynamic": + moe_comm_type = MoECommType.ALLTOALL + else: + moe_comm_type = MoECommType.ALLGATHER + + elif soc_version in {AscendDeviceType._910_93}: + moe_comm_type = ( + MoECommType.MC2 if num_tokens <= mc2_tokens_capacity else + MoECommType.FUSED_ALLTOALL if quant_type == "w8a8_dynamic" + and get_ep_group().world_size <= 16 else MoECommType.ALLTOALL) + else: + raise ValueError(f"Unsupported soc_version: {soc_version}") + + # PanguProMoE only supports allgather + if model_type == "PanguProMoE": + moe_comm_type = MoECommType.ALLGATHER + + if is_global_first_rank(): + logger.debug(f"num_tokens: {num_tokens}, " + f"moe_comm_type: {moe_comm_type}") + return moe_comm_type + + @staticmethod + def get_finished_kv_transfer( + scheduler_output: "SchedulerOutput", + ) -> tuple[Optional[set[str]], Optional[set[str]]]: + if has_kv_transfer_group(): + return get_kv_transfer_group().get_finished( + scheduler_output.finished_req_ids) + return None, None + + @torch.inference_mode() + def execute_model( + self, + scheduler_output: "SchedulerOutput", + intermediate_tensors: Optional[IntermediateTensors] = None, + ) -> Union[ModelRunnerOutput, IntermediateTensors] | None: + if self.execute_model_state is not None: + raise RuntimeError("State error: sample_tokens() must be called " + "after execute_model() returns None.") + + with ProfileExecuteDuration().capture_async("prepare input"): + self._update_states(scheduler_output) + if has_ec_transfer() and get_ec_transfer().is_producer: + with self.maybe_get_ec_connector_output( + scheduler_output, + encoder_cache=self.encoder_cache, + ): + self._execute_mm_encoder(scheduler_output) + return make_empty_encoder_model_runner_output( + scheduler_output) + + if not scheduler_output.total_num_scheduled_tokens: + if not has_kv_transfer_group(): + logger.debug( + "skip this step for we receive the data from remote disaggregate prefill node" + ) + # Return empty ModelRunnerOuptut if there's no work to do. + return EMPTY_MODEL_RUNNER_OUTPUT + return self.kv_connector_no_forward(scheduler_output) + + if self.dynamic_eplb: + self.eplb_updator.forward_before() + + (attn_metadata, positions, num_scheduled_tokens_np, + num_input_tokens, num_tokens_across_dp, maybe_padded_num_tokens, + logits_indices, spec_decode_metadata, input_ids, inputs_embeds, + intermediate_tensors, + max_query_len) = (self._prepare_inputs(scheduler_output, + intermediate_tensors)) + + if self.dynamic_eplb: + self.eplb_updator.take_update_info_from_eplb_process() + + moe_comm_type = self._select_moe_comm_method(num_input_tokens) + # prevent debugger is None + need_dump = self.dump_enable and self.debugger is not None + if need_dump: + assert self.debugger is not None + dbg_cfg = getattr(self.debugger, "config", None) + dump_level = str( + getattr(dbg_cfg, "level", + "L1")).upper() if dbg_cfg is not None else "L1" + if dump_level in ("L0", "MIX"): + self.debugger.start(model=self.model) + else: + self.debugger.start() + + uniform_decode = (max_query_len == self.uniform_decode_query_len) and ( + scheduler_output.total_num_scheduled_tokens + == self.input_batch.num_reqs * max_query_len) + has_lora = len(self.input_batch.lora_id_to_lora_request) > 0 + aclgraph_runtime_mode, batch_descriptor = \ + self.cudagraph_dispatcher.dispatch(num_tokens=num_input_tokens, uniform_decode=uniform_decode, has_lora=has_lora) + + # Run forward pass + with ProfileExecuteDuration().capture_async("forward"): + with set_ascend_forward_context( + attn_metadata, + self.vllm_config, + num_tokens=num_input_tokens, + num_tokens_across_dp=num_tokens_across_dp, + with_prefill=self.with_prefill, + moe_comm_type=moe_comm_type, + aclgraph_runtime_mode=aclgraph_runtime_mode, + batch_descriptor=batch_descriptor, + num_actual_tokens=scheduler_output. + total_num_scheduled_tokens, + prefetch_stream=self.prefetch_stream, + model_instance=self.model, + weight_prefetch_method=self.weight_prefetch_method): + self.maybe_setup_kv_connector(scheduler_output) + + hidden_states = self._generate_process_reqs_hidden_states( + maybe_padded_num_tokens, input_ids, positions, + intermediate_tensors, inputs_embeds) + + self.maybe_wait_for_kv_save() + finished_sending, finished_recving = self.get_finished_kv_transfer( + scheduler_output) + + aux_hidden_states = None + if self.drafter and self.drafter.name == SpecDcodeType.EAGLE3: + hidden_states, aux_hidden_states = hidden_states + + kv_connector_output = KVConnectorOutput( + finished_sending=finished_sending, + finished_recving=finished_recving) + finished_sending = None + finished_recving = None + with ProfileExecuteDuration().capture_async("post process"): + # Broadcast PP output for external_launcher (torchrun) + # to make sure we are synced across pp ranks + # TODO: Support overlapping mirco-batches + # https://github.com/vllm-project/vllm/issues/18019 + broadcast_pp_output = \ + self.parallel_config.distributed_executor_backend \ + == "external_launcher" and len(get_pp_group().ranks) > 0 + if not get_pp_group().is_last_rank: + # For mid-pipeline stages, return the hidden states. + if not broadcast_pp_output: + hidden_states.kv_connector_output = kv_connector_output + if need_dump: + assert self.debugger is not None + self.debugger.stop() + self.debugger.step() + return hidden_states + assert isinstance(hidden_states, IntermediateTensors) + get_pp_group().send_tensor_dict( + hidden_states.tensors, all_gather_group=get_tp_group()) + logits = None + else: + if self.input_batch.pooling_params: + pool_output = self._pool( + hidden_states, + scheduler_output.total_num_scheduled_tokens, + num_scheduled_tokens_np) + if need_dump: + assert self.debugger is not None + self.debugger.stop() + self.debugger.step() + return pool_output + # Sometimes, after the model is compiled through the AOT backend, + # the model output may become a list containing only one Tensor object. + if isinstance(hidden_states, list) and \ + len(hidden_states) == 1 and \ + isinstance(hidden_states[0], torch.Tensor): + hidden_states = hidden_states[0] + sample_hidden_states = hidden_states[logits_indices] + logits = self.model.compute_logits(sample_hidden_states) + if broadcast_pp_output: + model_output_broadcast_data = { + "logits": logits.contiguous(), + } if logits is not None else {} + model_output_broadcast_data = get_pp_group( + ).broadcast_tensor_dict(model_output_broadcast_data, + src=len(get_pp_group().ranks) - 1) + assert model_output_broadcast_data is not None + logits = model_output_broadcast_data["logits"] + + # Apply structured output bitmasks if present + self.execute_model_state = ExecuteModelState( + scheduler_output, + logits, + spec_decode_metadata, + hidden_states, + sample_hidden_states, + aux_hidden_states, + kv_connector_output, + attn_metadata, + positions, + ) + return None + def apply_grammar_bitmask( self, scheduler_output: "SchedulerOutput", @@ -2289,305 +1643,6 @@ class NPUModelRunner(LoRAModelRunnerMixin, ECConnectorModelRunnerMixin): ) return logits.to(self.device).to(logits_dtype) - def propose_draft_token_ids( - self, - valid_sampled_token_ids: torch.Tensor | list[list[int]], - sampling_metadata: SamplingMetadata, - scheduler_output: "SchedulerOutput", - spec_decode_metadata: SpecDecodeMetadata, - positions: torch.Tensor, - num_scheduled_tokens: int, - hidden_states: torch.Tensor, - attn_metadata: dict[str, Any], - aux_hidden_states: torch.Tensor = None, - ) -> Optional[list[list[int]]]: - if not self.drafter: - # Speculative decoding is not enabled. - draft_token_ids = None - else: - draft_token_ids = self.drafter.generate_token_ids( - valid_sampled_token_ids, sampling_metadata, scheduler_output, - spec_decode_metadata, positions, num_scheduled_tokens, - hidden_states, attn_metadata, aux_hidden_states) - return draft_token_ids - - def _pool( - self, - hidden_states: torch.Tensor, - num_scheduled_tokens: int, - num_scheduled_tokens_np: np.ndarray, - finished_sending: Optional[set[str]] = None, - finished_recving: Optional[set[str]] = None, - kv_connector_output: Optional["KVConnectorOutput"] = None, - ) -> ModelRunnerOutput: - assert self.input_batch.num_reqs ==\ - len(self.input_batch.pooling_params), \ - "Either all or none of the requests in" \ - " a batch must be pooling request" - - hidden_states = hidden_states[:num_scheduled_tokens] - pooling_metadata = self.input_batch.get_pooling_metadata() - pooling_metadata.build_pooling_cursor(num_scheduled_tokens_np.tolist(), - device=hidden_states.device) - seq_lens_cpu = self.seq_lens_cpu[:self.input_batch.num_reqs] - - model = cast(VllmModelForPooling, self.model) - raw_pooler_output = model.pooler( - hidden_states=hidden_states, - pooling_metadata=pooling_metadata, - ) - raw_pooler_output = json_map_leaves( - lambda x: x.to("cpu", non_blocking=True), - raw_pooler_output, - ) - torch.npu.synchronize() - - pooler_output: list[Optional[torch.Tensor]] = [] - for raw_output, seq_len, prompt_len in zip( - raw_pooler_output, seq_lens_cpu, pooling_metadata.prompt_lens): - output = raw_output if seq_len == prompt_len else None - pooler_output.append(output) - - return ModelRunnerOutput( - req_ids=self.input_batch.req_ids, - req_id_to_index=self.input_batch.req_id_to_index, - sampled_token_ids=[], - logprobs=None, - prompt_logprobs_dict={}, - pooler_output=pooler_output, - kv_connector_output=kv_connector_output, - ) - - def _select_moe_comm_method(self, - num_tokens: int) -> Optional[MoECommType]: - """1. If expert parallel is not enabled, we use all-gather since MC2 and all-to-all - are designed for expert parallelism. - 2. If expert parallel is enabled, we need to consider the soc version and the - number of tokens. This is based on the observation that all-gather is more - efficient than all-to-all when running on A2. - - a. For A2, we choose from MC2 and all-gather. - - b. For A3, we choose from MC2 and all-to-all. - - In both cases, we use MC2 when the number of tokens is smaller than - a its capacity threshold. - - Args: - num_tokens (int): The number of tokens in the current batch. - - Raises: - ValueError: If the soc version is unsupported. - - Returns: - MoECommType: The selected MoE communication method. - """ - if not is_moe_model(self.vllm_config): - return None - - soc_version = get_ascend_device_type() - quant_type = getattr( - self.vllm_config.model_config.hf_config, 'moe_quantize', - getattr(self.vllm_config.model_config.hf_config, 'quantize', None)) - model_type = self.vllm_config.model_config.hf_config.model_type - - if not self.parallel_config.enable_expert_parallel: - moe_comm_type = MoECommType.ALLGATHER - elif soc_version in {AscendDeviceType._910B}: - if (num_tokens <= self.mc2_tokens_capacity - and self.parallel_config.world_size_across_dp / - self.parallel_config.pipeline_parallel_size >= 16): - moe_comm_type = MoECommType.MC2 - else: - # Currently, w4a8_dynamic does not support allgatherep - if quant_type == "w4a8_dynamic": - moe_comm_type = MoECommType.ALLTOALL - else: - moe_comm_type = MoECommType.ALLGATHER - - elif soc_version in {AscendDeviceType._910_93}: - # TODO: drop the EP-size guard when dispatch_ffn_combine supports larger EP sizes - moe_comm_type = ( - MoECommType.MC2 if num_tokens <= self.mc2_tokens_capacity else - MoECommType.FUSED_ALLTOALL if quant_type == "w8a8_dynamic" - and get_ep_group().world_size <= 16 else MoECommType.ALLTOALL) - else: - raise ValueError(f"Unsupported soc_version: {soc_version}") - - # PanguProMoE only supports allgather - if model_type == "PanguProMoE": - moe_comm_type = MoECommType.ALLGATHER - - if is_global_first_rank(): - logger.debug(f"num_tokens: {num_tokens}, " - f"moe_comm_type: {moe_comm_type}") - return moe_comm_type - - @torch.inference_mode() - def execute_model( - self, - scheduler_output: "SchedulerOutput", - intermediate_tensors: Optional[IntermediateTensors] = None, - ) -> Union[ModelRunnerOutput, IntermediateTensors] | None: - if self.execute_model_state is not None: - raise RuntimeError("State error: sample_tokens() must be called " - "after execute_model() returns None.") - - with ProfileExecuteDuration().capture_async("prepare input"): - self._update_states(scheduler_output) - if has_ec_transfer() and get_ec_transfer().is_producer: - with self.maybe_get_ec_connector_output( - scheduler_output, - encoder_cache=self.encoder_cache, - ): - self._execute_mm_encoder(scheduler_output) - return make_empty_encoder_model_runner_output( - scheduler_output) - - if not scheduler_output.total_num_scheduled_tokens: - if not has_kv_transfer_group(): - logger.debug( - "skip this step for we receive the data from remote disaggregate prefill node" - ) - # Return empty ModelRunnerOuptut if there's no work to do. - return EMPTY_MODEL_RUNNER_OUTPUT - return self.kv_connector_no_forward(scheduler_output) - - if self.dynamic_eplb: - self.eplb_updator.forward_before() - - (attn_metadata, positions, num_scheduled_tokens_np, - num_input_tokens, num_tokens_across_dp, maybe_padded_num_tokens, - logits_indices, spec_decode_metadata, input_ids, inputs_embeds, - intermediate_tensors, - max_query_len) = (self._prepare_inputs(scheduler_output, - intermediate_tensors)) - - if self.dynamic_eplb: - self.eplb_updator.take_update_info_from_eplb_process() - - moe_comm_type = self._select_moe_comm_method(num_input_tokens) - # prevent debugger is None - need_dump = self.dump_enable and self.debugger is not None - if need_dump: - assert self.debugger is not None - dbg_cfg = getattr(self.debugger, "config", None) - dump_level = str( - getattr(dbg_cfg, "level", - "L1")).upper() if dbg_cfg is not None else "L1" - if dump_level in ("L0", "MIX"): - self.debugger.start(model=self.model) - else: - self.debugger.start() - - uniform_decode = (max_query_len == self.uniform_decode_query_len) and ( - scheduler_output.total_num_scheduled_tokens - == self.input_batch.num_reqs * max_query_len) - has_lora = len(self.input_batch.lora_id_to_lora_request) > 0 - aclgraph_runtime_mode, batch_descriptor = \ - self.aclgraph_dispatcher.dispatch(num_tokens=num_input_tokens, uniform_decode=uniform_decode, has_lora=has_lora) - - # Run forward pass - with ProfileExecuteDuration().capture_async("forward"): - with set_ascend_forward_context( - attn_metadata, - self.vllm_config, - num_tokens=num_input_tokens, - num_tokens_across_dp=num_tokens_across_dp, - with_prefill=self.with_prefill, - reserved_mc2_mask=self.reserved_mc2_mask, - moe_comm_type=moe_comm_type, - aclgraph_runtime_mode=aclgraph_runtime_mode, - batch_descriptor=batch_descriptor, - num_actual_tokens=scheduler_output. - total_num_scheduled_tokens, - prefetch_stream=self.prefetch_stream, - model_instance=self.model, - weight_prefetch_method=self.weight_prefetch_method): - self.maybe_setup_kv_connector(scheduler_output) - - hidden_states = self._generate_process_reqs_hidden_states( - maybe_padded_num_tokens, input_ids, positions, - intermediate_tensors, inputs_embeds) - - self.maybe_wait_for_kv_save() - finished_sending, finished_recving = self.get_finished_kv_transfer( - scheduler_output) - - aux_hidden_states = None - if self.drafter and self.drafter.name == SpecDcodeType.EAGLE3: - hidden_states, aux_hidden_states = hidden_states - - kv_connector_output = KVConnectorOutput( - finished_sending=finished_sending, - finished_recving=finished_recving) - finished_sending = None - finished_recving = None - with ProfileExecuteDuration().capture_async("post process"): - # Broadcast PP output for external_launcher (torchrun) - # to make sure we are synced across pp ranks - # TODO: Support overlapping mirco-batches - # https://github.com/vllm-project/vllm/issues/18019 - broadcast_pp_output = \ - self.parallel_config.distributed_executor_backend \ - == "external_launcher" and len(get_pp_group().ranks) > 0 - if not get_pp_group().is_last_rank: - # For mid-pipeline stages, return the hidden states. - if not broadcast_pp_output: - hidden_states.kv_connector_output = kv_connector_output - if need_dump: - assert self.debugger is not None - self.debugger.stop() - self.debugger.step() - return hidden_states - assert isinstance(hidden_states, IntermediateTensors) - get_pp_group().send_tensor_dict( - hidden_states.tensors, all_gather_group=get_tp_group()) - logits = None - else: - if self.input_batch.pooling_params: - pool_output = self._pool( - hidden_states, - scheduler_output.total_num_scheduled_tokens, - num_scheduled_tokens_np, finished_sending, - finished_recving, kv_connector_output) - if need_dump: - assert self.debugger is not None - self.debugger.stop() - self.debugger.step() - return pool_output - # Sometimes, after the model is compiled through the AOT backend, - # the model output may become a list containing only one Tensor object. - if isinstance(hidden_states, list) and \ - len(hidden_states) == 1 and \ - isinstance(hidden_states[0], torch.Tensor): - hidden_states = hidden_states[0] - sample_hidden_states = hidden_states[logits_indices] - logits = self.model.compute_logits(sample_hidden_states) - if broadcast_pp_output: - model_output_broadcast_data = { - "logits": logits.contiguous(), - } if logits is not None else {} - model_output_broadcast_data = get_pp_group( - ).broadcast_tensor_dict(model_output_broadcast_data, - src=len(get_pp_group().ranks) - 1) - assert model_output_broadcast_data is not None - logits = model_output_broadcast_data["logits"] - - # Apply structured output bitmasks if present - self.execute_model_state = ExecuteModelState( - scheduler_output, - logits, - spec_decode_metadata, - hidden_states, - sample_hidden_states, - aux_hidden_states, - kv_connector_output, - attn_metadata, - positions, - ) - return None - @torch.inference_mode def sample_tokens( self, grammar_output: "GrammarOutput | None" @@ -2603,7 +1658,7 @@ class NPUModelRunner(LoRAModelRunnerMixin, ECConnectorModelRunnerMixin): spec_decode_metadata, hidden_states, sample_hidden_states, - aux_hidden_states, + aux_hidden_states, # noqa kv_connector_output, attn_metadata, positions, @@ -2816,67 +1871,13 @@ class NPUModelRunner(LoRAModelRunnerMixin, ECConnectorModelRunnerMixin): assert self.debugger is not None self.debugger.stop() self.debugger.step() - return AsyncNPUModelRunnerOutput( + return AsyncGPUModelRunnerOutput( model_runner_output=model_runner_output, sampled_token_ids=sampled_token_ids, invalid_req_indices=invalid_req_indices, async_output_copy_stream=self.async_output_copy_stream, - vocab_size=self.input_batch.vocab_size, ) - def take_draft_token_ids(self) -> Optional[DraftTokenIds]: - if self._draft_token_ids is None: - return None - req_ids = self.input_batch.req_ids - if isinstance(self._draft_token_ids, torch.Tensor): - draft_token_ids = self._draft_token_ids.tolist() - else: - draft_token_ids = self._draft_token_ids - self._draft_token_ids = None - return DraftTokenIds(req_ids, draft_token_ids) - - def kv_connector_no_forward( - self, scheduler_output: "SchedulerOutput") -> ModelRunnerOutput: - with set_ascend_forward_context(None, self.vllm_config): - self.maybe_setup_kv_connector(scheduler_output) - finished_sending, finished_recving = ( - self.get_finished_kv_transfer(scheduler_output)) - # For the case of no forward caused by receiving remote kv, - # one round of dummy inference is necessary - # to prevent hang over the collective calls. - - output = copy.copy(EMPTY_MODEL_RUNNER_OUTPUT) - output.kv_connector_output = KVConnectorOutput( - finished_sending=finished_sending, - finished_recving=finished_recving) - return output - - @staticmethod - def maybe_setup_kv_connector(scheduler_output: "SchedulerOutput"): - # Update KVConnector with the KVConnector metadata forward(). - if has_kv_transfer_group(): - kv_connector = get_kv_transfer_group() - assert isinstance(kv_connector, KVConnectorBase_V1) - assert scheduler_output.kv_connector_metadata is not None - kv_connector.bind_connector_metadata( - scheduler_output.kv_connector_metadata) - - kv_connector.start_load_kv(get_forward_context()) - - @staticmethod - def maybe_wait_for_kv_save() -> None: - if has_kv_transfer_group(): - get_kv_transfer_group().wait_for_save() - - @staticmethod - def get_finished_kv_transfer( - scheduler_output: "SchedulerOutput", - ) -> tuple[Optional[set[str]], Optional[set[str]]]: - if has_kv_transfer_group(): - return get_kv_transfer_group().get_finished( - scheduler_output.finished_req_ids) - return None, None - def _build_dummy_attn_metadata( self, with_prefill: bool, @@ -2896,18 +1897,15 @@ class NPUModelRunner(LoRAModelRunnerMixin, ECConnectorModelRunnerMixin): attn_metadata = {} seq_lens = max_query_len - self.seq_lens_np[:num_reqs] = seq_lens - self.seq_lens_np[num_reqs:] = 0 - self.seq_lens[:num_reqs].copy_(self.seq_lens_cpu[:num_reqs], - non_blocking=True) + self.seq_lens.np[:num_reqs] = seq_lens + self.seq_lens.np[num_reqs:] = 0 + self.seq_lens.copy_to_gpu() cu_num_tokens, arange = self._get_cumsum_and_arange( num_scheduled_tokens) - self.query_start_loc_cpu[1:num_reqs + + self.query_start_loc.cpu[1:num_reqs + 1] = torch.Tensor(cu_num_tokens) - self.query_start_loc = self.query_start_loc_cpu.pin_memory().to( - self.device, non_blocking=True) self.query_lens = torch.from_numpy(num_scheduled_tokens) self.attn_mask = self.attn_mask_builder.get_splitfuse_attn_mask() @@ -2934,31 +1932,29 @@ class NPUModelRunner(LoRAModelRunnerMixin, ECConnectorModelRunnerMixin): # QUESTION: Why do we separately set query_start_loc for spec in the first place? # While in _prepare_inputs we don't? if self.speculative_config: - self.query_start_loc[:num_reqs + 1] = torch.tensor( + self.query_start_loc.gpu[:num_reqs + 1] = torch.tensor( [0] + self.actual_seq_lengths_q[:num_reqs], device=self.device, dtype=torch.int32) common_attn_metadata = AscendCommonAttentionMetadata( - query_start_loc=self.query_start_loc[:num_reqs + 1], - query_start_loc_cpu=self.query_start_loc_cpu[:num_reqs + + query_start_loc=self.query_start_loc.gpu[:num_reqs + 1], + query_start_loc_cpu=self.query_start_loc.cpu[:num_reqs + 1], - seq_lens_cpu=self.seq_lens_cpu, - seq_lens=self.seq_lens[:num_reqs], + seq_lens_cpu=self.seq_lens.cpu, + seq_lens=self.seq_lens.gpu[:num_reqs], num_reqs=num_reqs, num_actual_tokens=num_tokens, num_input_tokens=num_tokens, actual_seq_lengths_q=self.actual_seq_lengths_q, block_table_tensor=block_table_tensor[:num_reqs], - slot_mapping=slot_mapping, + slot_mapping=slot_mapping.gpu, num_computed_tokens_cpu=num_computed_tokens_cpu, - positions=self.positions, + positions=self.positions.gpu, attn_mask=self.attn_mask, spec_attn_mask=self.spec_attn_mask, attn_state=self.attn_state, max_query_len=max_query_len, decode_token_per_req=self.decode_token_per_req, - cos=self.cos, - sin=self.sin, prefill_context_parallel_metadata=long_seq_metadata, ) if self.pcp_size > 1: @@ -2970,15 +1966,15 @@ class NPUModelRunner(LoRAModelRunnerMixin, ECConnectorModelRunnerMixin): attn_state = AscendAttentionState.SpecDecoding common_metadata = CommonAttentionMetadata( - query_start_loc=self.query_start_loc[:num_reqs + 1], - query_start_loc_cpu=self.query_start_loc_cpu[:num_reqs + + query_start_loc=self.query_start_loc.gpu[:num_reqs + 1], + query_start_loc_cpu=self.query_start_loc.cpu[:num_reqs + 1], - seq_lens_cpu=self.seq_lens_cpu[:num_reqs], - seq_lens=self.seq_lens_cpu[:num_reqs], + seq_lens_cpu=self.seq_lens.cpu[:num_reqs], + seq_lens=self.seq_lens.cpu[:num_reqs], num_reqs=num_reqs, num_actual_tokens=num_tokens, block_table_tensor=block_table_tensor[:num_reqs], - slot_mapping=slot_mapping, + slot_mapping=slot_mapping.gpu, num_computed_tokens_cpu=num_computed_tokens_cpu, max_query_len=max_query_len, max_seq_len=seq_lens) @@ -3113,7 +2109,7 @@ class NPUModelRunner(LoRAModelRunnerMixin, ECConnectorModelRunnerMixin): has_lora = True if self.lora_config and self.compilation_config.cudagraph_specialize_lora else False _ag_mode, batch_descriptor = \ - self.aclgraph_dispatcher.dispatch(num_tokens=num_tokens, uniform_decode=uniform_decode, has_lora=has_lora) + self.cudagraph_dispatcher.dispatch(num_tokens=num_tokens, uniform_decode=uniform_decode, has_lora=has_lora) num_tokens_padded = batch_descriptor.num_tokens num_reqs_padded = (batch_descriptor.num_reqs if @@ -3160,13 +2156,13 @@ class NPUModelRunner(LoRAModelRunnerMixin, ECConnectorModelRunnerMixin): input_ids = None inputs_embeds = self.inputs_embeds.gpu[:num_tokens_padded] else: - input_ids = self.input_ids[:num_tokens_padded] + input_ids = self.input_ids.gpu[:num_tokens_padded] inputs_embeds = None if self.uses_mrope: - positions = self.mrope_positions[:, :num_tokens_padded] + positions = self.mrope_positions.gpu[:, :num_tokens_padded] else: - positions = self.positions[:num_tokens_padded] + positions = self.positions.gpu[:num_tokens_padded] if get_pp_group().is_first_rank: intermediate_tensors = None @@ -3215,7 +2211,7 @@ class NPUModelRunner(LoRAModelRunnerMixin, ECConnectorModelRunnerMixin): num_tokens_across_dp=num_tokens_across_dp, with_prefill=with_prefill, in_profile_run=self.in_profile_run, - reserved_mc2_mask=self.reserved_mc2_mask, + # reserved_mc2_mask=self.reserved_mc2_mask, moe_comm_type=moe_comm_type, num_actual_tokens=0, aclgraph_runtime_mode=aclgraph_runtime_mode, @@ -3263,10 +2259,10 @@ class NPUModelRunner(LoRAModelRunnerMixin, ECConnectorModelRunnerMixin): # MC2 will consume additional NPU memory. # Therefore, we need to run the MC2 path once here to complete its initialization, # allowing vLLM to correctly estimate the maximum memory required. - if self.max_num_tokens > self.mc2_tokens_capacity and \ - self._select_moe_comm_method( - self.mc2_tokens_capacity) == MoECommType.MC2: - self._dummy_run(self.mc2_tokens_capacity, with_prefill=True) + mc2_tokens_capacity = get_mc2_tokens_capacity() + if self.max_num_tokens > mc2_tokens_capacity and \ + self._select_moe_comm_method(mc2_tokens_capacity) == MoECommType.MC2: + self._dummy_run(mc2_tokens_capacity, with_prefill=True) output = None if get_pp_group().is_last_rank: @@ -3298,73 +2294,6 @@ class NPUModelRunner(LoRAModelRunnerMixin, ECConnectorModelRunnerMixin): self.encoder_cache.clear() gc.collect() - def _dummy_pooler_run_task( - self, - hidden_states: torch.Tensor, - task: PoolingTask, - ) -> PoolerOutput: - num_tokens = hidden_states.shape[0] - max_num_reqs = self.scheduler_config.max_num_seqs - num_reqs = min(num_tokens, max_num_reqs) - min_tokens_per_req = num_tokens // num_reqs - num_scheduled_tokens_list = [min_tokens_per_req] * num_reqs - num_scheduled_tokens_list[-1] += num_tokens % num_reqs - assert sum(num_scheduled_tokens_list) == num_tokens - assert len(num_scheduled_tokens_list) == num_reqs - - req_num_tokens = num_tokens // num_reqs - - dummy_token_ids = torch.zeros((num_reqs, req_num_tokens), - dtype=torch.int32, - device=self.device) - - model = cast(VllmModelForPooling, self.get_model()) - dummy_pooling_params = PoolingParams(task=task) - to_update = model.pooler.get_pooling_updates(task) - to_update.apply(dummy_pooling_params) - - dummy_prompt_lens = torch.tensor( - num_scheduled_tokens_list, - device="cpu", - ) - dummy_metadata = PoolingMetadata( - prompt_lens=dummy_prompt_lens, - prompt_token_ids=dummy_token_ids, - pooling_params=[dummy_pooling_params] * num_reqs, - ) - - dummy_metadata.build_pooling_cursor(num_scheduled_tokens_list, - device=hidden_states.device) - - try: - return model.pooler(hidden_states=hidden_states, - pooling_metadata=dummy_metadata) - except RuntimeError as e: - if 'out of memory' in str(e): - raise RuntimeError( - "CUDA out of memory occurred when warming up pooler " - f"({task=}) with {num_reqs} dummy requests. Please try " - "lowering `max_num_seqs` or `gpu_memory_utilization` when " - "initializing the engine.") from e - else: - raise e - - @torch.inference_mode() - def _dummy_pooler_run( - self, - hidden_states: torch.Tensor, - ) -> PoolerOutput: - # Find the task that has the largest output for subsequent steps - output_size = dict[PoolingTask, float]() - for task in self.get_supported_pooling_tasks(): - # Run a full batch with each task to ensure none of them OOMs - output = self._dummy_pooler_run_task(hidden_states, task) - output_size[task] = sum(o.nbytes for o in output) - del output # Allow GC - - max_task = max(output_size.items(), key=lambda x: x[1])[0] - return self._dummy_pooler_run_task(hidden_states, max_task) - def eplb_warmup(self): if self.dynamic_eplb and not self.is_eplb_warmuped: self.is_eplb_warmuped = True @@ -3807,7 +2736,6 @@ class NPUModelRunner(LoRAModelRunnerMixin, ECConnectorModelRunnerMixin): # of mamba block. In this case, BlockTable.block_size will never equal # to kernel_block_sizes[0] kernel_block_sizes.append([0]) - if block_sizes != [ self.cache_config.block_size ] or kernel_block_sizes != [[self.cache_config.block_size]]: @@ -3832,31 +2760,6 @@ class NPUModelRunner(LoRAModelRunnerMixin, ECConnectorModelRunnerMixin): kernel_block_sizes=kernel_block_sizes, ) - def may_add_encoder_only_layers_to_kv_cache_config(self) -> None: - """ - Add encoder-only layers to the KV cache config. - """ - block_size = self.vllm_config.cache_config.block_size - encoder_only_attn_specs: dict[AttentionSpec, - list[str]] = defaultdict(list) - attn_layers = get_layers_from_vllm_config(self.vllm_config, Attention) - for layer_name, attn_module in attn_layers.items(): - if attn_module.attn_type == AttentionType.ENCODER_ONLY: - attn_spec: AttentionSpec = EncoderOnlyAttentionSpec( - block_size=block_size, - num_kv_heads=attn_module.num_kv_heads, - head_size=attn_module.head_size, - dtype=self.kv_cache_dtype) - encoder_only_attn_specs[attn_spec].append(layer_name) - self.runner_only_attn_layers.add(layer_name) - if len(encoder_only_attn_specs) > 0: - assert len( - encoder_only_attn_specs - ) == 1, "Only support one encoder-only attention spec now" - spec, layer_names = encoder_only_attn_specs.popitem() - self.kv_cache_config.kv_cache_groups.append( - KVCacheGroupSpec(layer_names=layer_names, kv_cache_spec=spec)) - def initialize_attn_backend(self, kv_cache_config: KVCacheConfig) -> None: """ Initialize the attention backends and attention metadata builders. @@ -3925,15 +2828,6 @@ class NPUModelRunner(LoRAModelRunnerMixin, ECConnectorModelRunnerMixin): # Calculate reorder batch threshold (if needed) self.calculate_reorder_batch_threshold() - def _attn_group_iterator(self) -> Iterator[AttentionGroup]: - return itertools.chain.from_iterable(self.attn_groups) - - def _kv_cache_spec_attn_group_iterator(self) -> Iterator[AttentionGroup]: - if not self.kv_cache_config.kv_cache_groups: - return - for attn_groups in self.attn_groups: - yield from attn_groups - def calculate_reorder_batch_threshold(self) -> None: """ Check that if any backends reorder batches; that the reordering @@ -3941,12 +2835,13 @@ class NPUModelRunner(LoRAModelRunnerMixin, ECConnectorModelRunnerMixin): """ for group in self._attn_group_iterator(): attn_metadata_builder_i = group.get_metadata_builder() - if hasattr(attn_metadata_builder_i, "reorder_batch_threshold"): + if hasattr(attn_metadata_builder_i, + "reorder_batch_threshold"): # noqa # check that if any backends reorder batches; that the reordering # is compatible (e.g., decode threshold is the same) reorder_batch_threshold_i = ( attn_metadata_builder_i.reorder_batch_threshold) - if reorder_batch_threshold_i is not None: + if reorder_batch_threshold_i is not None: # noqa if self.reorder_batch_threshold is not None: if reorder_batch_threshold_i != \ self.reorder_batch_threshold: @@ -3956,7 +2851,7 @@ class NPUModelRunner(LoRAModelRunnerMixin, ECConnectorModelRunnerMixin): f"backend uses threshold " f"{self.reorder_batch_threshold}") else: - self.reorder_batch_threshold = reorder_batch_threshold_i + self.reorder_batch_threshold = reorder_batch_threshold_i # noqa def get_kv_cache_spec(self) -> dict[str, KVCacheSpec]: """ @@ -4122,16 +3017,16 @@ class NPUModelRunner(LoRAModelRunnerMixin, ECConnectorModelRunnerMixin): self.uniform_decode_query_len, self.parallel_config.tensor_parallel_size) capture_sizes = self.compilation_config.cudagraph_capture_sizes - self.aclgraph_batch_sizes = (capture_sizes - if capture_sizes is not None else []) + self.cudagraph_batch_sizes = (capture_sizes + if capture_sizes is not None else []) # NOTE: Since aclgraph_batch_sizes cannot be determined until here, # we set the graph params right before initializing the keys. - set_graph_params(self.aclgraph_batch_sizes) + set_graph_params(self.cudagraph_batch_sizes) if self.speculative_config: - set_mtp_graph_params(self.aclgraph_batch_sizes) + set_mtp_graph_params(self.cudagraph_batch_sizes) - self.aclgraph_dispatcher.initialize_cudagraph_keys( + self.cudagraph_dispatcher.initialize_cudagraph_keys( self.compilation_config.cudagraph_mode, self.uniform_decode_query_len) @@ -4199,7 +3094,7 @@ class NPUModelRunner(LoRAModelRunnerMixin, ECConnectorModelRunnerMixin): aclgraph_runtime_mode = aclgraph_mode.mixed_mode() # make sure we capture the largest batch size first - compilation_cases = list(reversed(self.aclgraph_batch_sizes)) + compilation_cases = list(reversed(self.cudagraph_batch_sizes)) try: self._capture_aclgraphs( @@ -4231,8 +3126,8 @@ class NPUModelRunner(LoRAModelRunnerMixin, ECConnectorModelRunnerMixin): max_num_tokens = self.scheduler_config.max_num_seqs * \ self.uniform_decode_query_len decode_cudagraph_batch_sizes = [ - x for x in self.aclgraph_batch_sizes if x <= max_num_tokens - and x >= self.uniform_decode_query_len + x for x in self.cudagraph_batch_sizes if + x <= max_num_tokens and x >= self.uniform_decode_query_len ] compilation_cases_decode = list( reversed(decode_cudagraph_batch_sizes)) @@ -4265,112 +3160,6 @@ class NPUModelRunner(LoRAModelRunnerMixin, ECConnectorModelRunnerMixin): logger.info("Graph capturing finished in %.0f secs, took %.2f GiB", elapsed_time, npu_graph_size / (1 << 30)) - def _get_prompt_logprobs_dict( - self, - hidden_states: torch.Tensor, - scheduler_output: "SchedulerOutput", - ) -> dict[str, Optional[LogprobsTensors]]: - num_prompt_logprobs_dict = self.input_batch.num_prompt_logprobs - if not num_prompt_logprobs_dict: - return {} - - in_progress_dict = self.input_batch.in_progress_prompt_logprobs_cpu - prompt_logprobs_dict: dict[str, Optional[LogprobsTensors]] = {} - - # Since prompt logprobs are a rare feature, prioritize simple, - # maintainable loop over optimal performance. - completed_prefill_reqs = [] - for req_id, num_prompt_logprobs in num_prompt_logprobs_dict.items(): - - num_tokens = scheduler_output.num_scheduled_tokens[req_id] - - # Get metadata for this request. - request = self.requests[req_id] - if request.prompt_token_ids is None: - # Prompt logprobs is incompatible with prompt embeddings - continue - num_prompt_tokens = len(request.prompt_token_ids) - prompt_token_ids = torch.tensor(request.prompt_token_ids).to( - self.device, non_blocking=True) - - # Set up target LogprobsTensors object. - logprobs_tensors = in_progress_dict.get(req_id) - if not logprobs_tensors: - # Create empty logprobs CPU tensors for the entire prompt. - # If chunked, we'll copy in slice by slice. - logprobs_tensors = LogprobsTensors.empty_cpu( - num_prompt_tokens - 1, num_prompt_logprobs + 1) - in_progress_dict[req_id] = logprobs_tensors - - # Determine number of logits to retrieve. - start_idx = request.num_computed_tokens - start_tok = start_idx + 1 - num_remaining_tokens = num_prompt_tokens - start_tok - if num_tokens <= num_remaining_tokens: - # This is a chunk, more tokens remain. - # In the == case, there are no more prompt logprobs to produce - # but we want to defer returning them to the next step where we - # have new generated tokens to return. - num_logits = num_tokens - else: - # This is the last chunk of prompt tokens to return. - num_logits = num_remaining_tokens - if num_logits > 0: - completed_prefill_reqs.append(req_id) - prompt_logprobs_dict[req_id] = logprobs_tensors - - if num_logits <= 0: - # This can happen for the final chunk if we prefilled exactly - # (num_prompt_tokens - 1) tokens for this request in the prior - # step. There are no more prompt logprobs to produce. - continue - - # Get the logits corresponding to this req's prompt tokens. - # If this is a partial request (i.e. chunked prefill), - # then there is prompt logprob generated for each index. - req_idx = self.input_batch.req_id_to_index[req_id] - offset = self.query_start_loc_np[req_idx].item() - prompt_hidden_states = hidden_states[offset:offset + num_logits] - logits = self.model.compute_logits(prompt_hidden_states) - - # Get the "target" tokens for each index. For prompt at index i, - # the token at prompt index i+1 is the "sampled" token we want - # to gather the logprob for. - tgt_token_ids = prompt_token_ids[start_tok:start_tok + num_logits] - - # Compute prompt logprobs. - logprobs = self.sampler.compute_logprobs(logits) - token_ids, logprobs, ranks = self.sampler.gather_logprobs( - logprobs, num_prompt_logprobs, tgt_token_ids) - - # Transfer NPU->CPU async. - chunk_slice = slice(start_idx, start_idx + num_logits) - logprobs_tensors.logprob_token_ids[chunk_slice].copy_( - token_ids, non_blocking=True) - logprobs_tensors.logprobs[chunk_slice].copy_(logprobs, - non_blocking=True) - logprobs_tensors.selected_token_ranks[chunk_slice].copy_( - ranks, non_blocking=True) - - # Remove requests that have completed prefill from the batch - # num_prompt_logprobs_dict. - for req_id in completed_prefill_reqs: - del num_prompt_logprobs_dict[req_id] - del in_progress_dict[req_id] - - # Must synchronize the non-blocking NPU->CPU transfers. - if prompt_logprobs_dict: - torch.npu.synchronize() - - return prompt_logprobs_dict - - def get_supported_pooling_tasks(self): - model = self.get_model() - if not is_pooling_model(model): - return [] - - return list(model.pooler.get_supported_tasks()) - def _update_tokens_for_pcp(self, tokens): num_reqs = self.input_batch.num_reqs self.num_pcp_pads = self.num_pcp_pads[:num_reqs] @@ -4637,10 +3426,10 @@ class NPUModelRunner(LoRAModelRunnerMixin, ECConnectorModelRunnerMixin): req_indices_pcp_full = np.repeat(self.arange_np[:num_reqs], num_scheduled_tokens_pcp_full) cu_num_tokens_pcp_full = np.cumsum(num_scheduled_tokens_pcp_full) - self.query_start_loc_pcp_full_np[0] = 0 - self.query_start_loc_pcp_full_np[1:num_reqs + + self.query_start_loc_pcp_full.np[0] = 0 + self.query_start_loc_pcp_full.np[1:num_reqs + 1] = cu_num_tokens_pcp_full - self.query_start_loc_pcp_full_np[num_reqs + 1:].fill(-1) + self.query_start_loc_pcp_full.np[num_reqs + 1:].fill(-1) cumsums_offsets_pcp_full = np.repeat( cu_num_tokens_pcp_full - num_scheduled_tokens_pcp_full, num_scheduled_tokens_pcp_full) @@ -4654,17 +3443,50 @@ class NPUModelRunner(LoRAModelRunnerMixin, ECConnectorModelRunnerMixin): token_indices_pcp_full = ( positions_pcp_full_np + req_indices_pcp_full * self.input_batch.token_ids_cpu.shape[1]) - torch.index_select( - self.input_batch.token_ids_cpu_tensor.flatten(), - 0, - torch.from_numpy(token_indices_pcp_full), - out=self. - input_ids_pcp_full_cpu[:total_num_scheduled_tokens_pcp_full]) - self.query_start_loc_pcp_full[:num_reqs + 1].copy_( - self.query_start_loc_pcp_full_cpu[:num_reqs + 1], - non_blocking=True, - ) - self.input_ids_pcp_full[:total_num_scheduled_tokens_pcp_full].copy_( - self.input_ids_pcp_full_cpu[:total_num_scheduled_tokens_pcp_full], + torch.index_select(self.input_batch.token_ids_cpu_tensor.flatten(), + 0, + torch.from_numpy(token_indices_pcp_full), + out=self.input_ids_pcp_full. + cpu[:total_num_scheduled_tokens_pcp_full]) + self.query_start_loc_pcp_full.copy_to_gpu() + self.input_ids_pcp_full.gpu[:total_num_scheduled_tokens_pcp_full].copy_( + self.input_ids_pcp_full.cpu[:total_num_scheduled_tokens_pcp_full], non_blocking=True, ) + + +@contextmanager +def _torch_cuda_wrapper(): + + class _EventPlaceholder: + + def __init__(self, *args, **kwargs) -> None: + self.record = lambda: None + self.synchronize = lambda: None + + class _StreamPlaceholder: + + def __init__(self, *args, **kwargs) -> None: + pass + + try: + # replace cuda APIs with xpu APIs, this should work by default + torch.cuda.Event = _EventPlaceholder + torch.cuda.Stream = torch.npu.Stream + torch.cuda.default_stream = torch.npu.default_stream + torch.cuda.current_stream = torch.npu.current_stream + torch.cuda.stream = torch.npu.stream + yield + except Exception: + torch.cuda.Event = _EventPlaceholder + torch.cuda.Stream = _StreamPlaceholder + torch.cuda.default_stream = _StreamPlaceholder + torch.cuda.current_stream = _StreamPlaceholder + torch.cuda.stream = _StreamPlaceholder + finally: + # if anything goes wrong, just patch it with a placeholder + torch.cuda.Event = _EventPlaceholder + torch.cuda.Stream = torch.cuda.Stream + torch.cuda.default_stream = torch.npu.default_stream + torch.cuda.current_stream = torch.npu.current_stream + torch.cuda.stream = torch.npu.stream diff --git a/vllm_ascend/worker/npu_input_batch.py b/vllm_ascend/worker/npu_input_batch.py index a6f8e3bd..d9db1566 100644 --- a/vllm_ascend/worker/npu_input_batch.py +++ b/vllm_ascend/worker/npu_input_batch.py @@ -114,6 +114,7 @@ class InputBatch: vocab_size: int, block_sizes: list[int], # The block_size of each kv cache group logitsprocs: Optional[LogitsProcessors] = None, + logitsprocs_need_output_token_ids: bool = False, is_spec_decode: bool = False, is_pooling_model: bool = False, num_speculative_tokens: int = 0, @@ -143,10 +144,11 @@ class InputBatch: pin_memory=False, ) self.token_ids_cpu = self.token_ids_cpu_tensor.numpy() - self.is_token_ids = torch.zeros((max_num_reqs, max_model_len), - device="cpu", - dtype=bool, - pin_memory=False) + self.is_token_ids_tensor = torch.zeros((max_num_reqs, max_model_len), + device="cpu", + dtype=bool, + pin_memory=False) + self.is_token_ids = self.is_token_ids_tensor.numpy() # Store prompt embeddings per request to avoid OOM from large upfront # allocation if max_model_len is big. # Maps req_index -> tensor of shape (num_prompt_tokens, hidden_size) @@ -299,6 +301,11 @@ class InputBatch: # Store provided logitsprocs. If none are provided, initialize empty # data structure self.logitsprocs = logitsprocs or LogitsProcessors() + self.logitsprocs_need_output_token_ids = logitsprocs_need_output_token_ids + + # Store last speculative tokens for sampler. + self.spec_token_ids: list[list[int]] = [[] + for _ in range(max_num_reqs)] # This is updated each time the batch constituents change. self.sampling_metadata = self._make_sampling_metadata() @@ -306,9 +313,14 @@ class InputBatch: self.pooling_params: dict[str, PoolingParams] = {} # Cached reference to the GPU tensor of previously sampled tokens - self.prev_sampled_token_ids: Optional[torch.Tensor] = None + self.prev_sampled_token_ids: torch.Tensor | None = None self.prev_sampled_token_ids_invalid_indices: Optional[set[int]] = None - self.prev_req_id_to_index: Optional[dict[str, int]] = None + self.prev_req_id_to_index: dict[str, int] | None = None + # These are used to update output_token_ids with real sampled + # ids from prior step, if required by current sampling params + # (e.g. penalties). + self.sampled_token_ids_cpu: torch.Tensor | None = None + self.async_copy_ready_event: torch.Event | None = None @property def req_ids(self) -> list[str]: @@ -350,9 +362,11 @@ class InputBatch: if req_index == len(self._req_ids): self._req_ids.append(req_id) self.req_output_token_ids.append(request.output_token_ids) + self.spec_token_ids.append([]) else: self._req_ids[req_index] = req_id self.req_output_token_ids[req_index] = request.output_token_ids + self.spec_token_ids[req_index].clear() self.req_id_to_index[req_id] = req_index @@ -496,6 +510,21 @@ class InputBatch: self.batch_update_builder.removed_append(req_index) self._req_ids[req_index] = None self.req_output_token_ids[req_index] = None + self.spec_token_ids[req_index].clear() + + # LoRA + lora_id = self.request_lora_mapping[req_index] + if lora_id != 0: + lora_req_ids = self.lora_id_to_request_ids[lora_id] + lora_req_ids.discard(req_id) + if not lora_req_ids: + del self.lora_id_to_request_ids[lora_id] + del self.lora_id_to_lora_request[lora_id] + self.request_lora_mapping[req_index] = 0 + + if self.is_pooling_model: + self.pooling_params.pop(req_id, None) + return req_index self.greedy_reqs.discard(req_id) self.random_reqs.discard(req_id) @@ -510,6 +539,8 @@ class InputBatch: self.num_prompt_logprobs.pop(req_id, None) self.in_progress_prompt_logprobs_cpu.pop(req_id, None) + if self.prev_req_id_to_index is not None: + self.prev_req_id_to_index.pop(req_id, None) # LoRA lora_id = self.request_lora_mapping[req_index] if lora_id != 0: @@ -538,6 +569,10 @@ class InputBatch: self._req_ids[i2], self._req_ids[i1] # noqa self.req_output_token_ids[i1], self.req_output_token_ids[i2] =\ self.req_output_token_ids[i2], self.req_output_token_ids[i1] + self.spec_token_ids[i1], self.spec_token_ids[i2] = ( + self.spec_token_ids[i2], + self.spec_token_ids[i1], + ) assert old_id_i1 is not None and old_id_i2 is not None self.req_id_to_index[old_id_i1], self.req_id_to_index[old_id_i2] =\ self.req_id_to_index[old_id_i2], self.req_id_to_index[old_id_i1] @@ -629,6 +664,7 @@ class InputBatch: # The batched states are empty. self._req_ids.clear() self.req_output_token_ids.clear() + self.spec_token_ids.clear() return # NOTE(woosuk): This function assumes that the empty_req_indices @@ -662,6 +698,16 @@ class InputBatch: self.req_output_token_ids[last_req_index] = None self.req_id_to_index[req_id] = empty_index + if last_req_index != empty_index: + ( + self.spec_token_ids[last_req_index], + self.spec_token_ids[empty_index], + ) = ( + self.spec_token_ids[empty_index], + self.spec_token_ids[last_req_index], + ) + self.spec_token_ids[last_req_index].clear() + num_tokens = self.num_tokens[last_req_index] self.token_ids_cpu[empty_index, :num_tokens] = self.token_ids_cpu[ last_req_index, :num_tokens] @@ -714,6 +760,7 @@ class InputBatch: # Trim lists to the batch size. del self._req_ids[num_reqs:] del self.req_output_token_ids[num_reqs:] + del self.spec_token_ids[num_reqs:] def refresh_metadata(self): """Apply any batch updates to sampling metadata.""" @@ -787,6 +834,7 @@ class InputBatch: presence_penalties=self.presence_penalties[:num_reqs], repetition_penalties=self.repetition_penalties[:num_reqs], output_token_ids=cast(list[list[int]], self.req_output_token_ids), + spec_token_ids=cast(list[list[int]], self.spec_token_ids), no_penalties=self.no_penalties, allowed_token_ids_mask=allowed_token_ids_mask, bad_words_token_ids=self.bad_words_token_ids, @@ -848,6 +896,53 @@ class InputBatch: return prompt_lora_mapping, token_lora_mapping, active_lora_requests + def set_async_sampled_token_ids( + self, + sampled_token_ids_cpu: torch.Tensor, + async_copy_ready_event: torch.Event, + ) -> None: + """ + In async scheduling case, store ref to sampled_token_ids_cpu + tensor and corresponding copy-ready event. Used to repair + output_token_ids prior to sampling, if needed by logits processors. + """ + if self.sampling_metadata.output_token_ids: + self.sampled_token_ids_cpu = sampled_token_ids_cpu + self.async_copy_ready_event = async_copy_ready_event + else: + self.sampled_token_ids_cpu = None + self.async_copy_ready_event = None + + def update_async_output_token_ids(self) -> None: + """ + In async scheduling case, update output_token_ids in sampling metadata + from prior steps sampled token ids once they've finished copying to CPU. + This is called right before they are needed by the logits processors. + """ + output_token_ids = self.sampling_metadata.output_token_ids + if self.sampled_token_ids_cpu is None or not output_token_ids: + # Output token ids not needed or not async scheduling. + return + + assert self.prev_req_id_to_index is not None + sampled_token_ids = None + for index, req_id in enumerate(self.req_ids): + prev_index = self.prev_req_id_to_index.get(req_id) + if prev_index is None: + continue + req_output_token_ids = output_token_ids[index] + if not req_output_token_ids or req_output_token_ids[-1] != -1: + # Final output id is not a placeholder, some tokens must have + # been discarded after a kv-load failure. + continue + if sampled_token_ids is None: + assert self.async_copy_ready_event is not None + self.async_copy_ready_event.synchronize() + sampled_token_ids = self.sampled_token_ids_cpu.squeeze( + -1).tolist() + # Replace placeholder token id with actual sampled id. + req_output_token_ids[-1] = sampled_token_ids[prev_index] + @property def num_reqs(self) -> int: return len(self.req_id_to_index)