diff --git a/vllm_ascend/patch/__init__.py b/vllm_ascend/patch/__init__.py index 4e46c171..78a111bd 100644 --- a/vllm_ascend/patch/__init__.py +++ b/vllm_ascend/patch/__init__.py @@ -263,3 +263,12 @@ # https://gitcode.com/Ascend/torchair/pull/2575 # Future Plan: # Remove this patch when the PTA version used by vllm-ascend has been upgraded. +# ** 14. File: worker/patch_v2_uva.py** +# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +# 1. `vllm.v1.worker.gpu.states.UvaBuffer` +# Why: +# ASCEND NPUs do not support UVA yet, so we need to wrap it in vLLM. +# How: +# make UvaBuffer a dummy class, mimic the interface of vllm UvaBuffer. +# Future Plan: +# Remove this patch when NPU support UVA. diff --git a/vllm_ascend/patch/worker/__init__.py b/vllm_ascend/patch/worker/__init__.py index bd44362d..f2adab0c 100644 --- a/vllm_ascend/patch/worker/__init__.py +++ b/vllm_ascend/patch/worker/__init__.py @@ -31,6 +31,7 @@ import vllm_ascend.patch.worker.patch_qwen3_next # noqa import vllm_ascend.patch.worker.patch_qwen3_next_mtp # noqa import vllm_ascend.patch.worker.patch_rejection_sampler # noqa import vllm_ascend.patch.worker.patch_qwen3_next # noqa -import vllm_ascend.patch.worker.patch_v2_egale # noqa +import vllm_ascend.patch.worker.patch_v2_eagle # noqa +import vllm_ascend.patch.worker.patch_v2_uva # noqa import vllm_ascend.patch.worker.patch_huanyuan_vl # noqa import vllm_ascend.patch.worker.patch_npugraph_ex_triton # noqa diff --git a/vllm_ascend/patch/worker/patch_v2_egale.py b/vllm_ascend/patch/worker/patch_v2_eagle.py similarity index 81% rename from vllm_ascend/patch/worker/patch_v2_egale.py rename to vllm_ascend/patch/worker/patch_v2_eagle.py index 61b25c97..4ec002b1 100644 --- a/vllm_ascend/patch/worker/patch_v2_egale.py +++ b/vllm_ascend/patch/worker/patch_v2_eagle.py @@ -16,10 +16,9 @@ # limitations under the License. # This file is a part of the vllm-ascend project. # -import numpy as np import torch import vllm -from vllm.v1.sample.metadata import SamplingMetadata +from vllm.v1.worker.gpu.attn_utils import build_slot_mappings_by_layer from vllm.v1.worker.gpu.input_batch import InputBatch from vllm.v1.worker.gpu.sample.gumbel import gumbel_sample from vllm.v1.worker.gpu.spec_decode.eagle import prepare_eagle_decode, prepare_eagle_inputs @@ -31,7 +30,6 @@ from vllm_ascend.worker.v2.attn_utils import build_attn_metadata def propose( self, input_batch: InputBatch, - sampling_metadata: SamplingMetadata, # [num_tokens, hidden_size] last_hidden_states: torch.Tensor, # num_layers x [num_tokens, hidden_size] @@ -40,10 +38,14 @@ def propose( num_sampled: torch.Tensor, # [num_reqs] num_rejected: torch.Tensor, - # [num_reqs] + # [max_num_reqs] last_sampled: torch.Tensor, - # [num_reqs] + # [max_num_reqs] next_prefill_tokens: torch.Tensor, + # [max_num_reqs] + temperature: torch.Tensor, + # [max_num_reqs] + seeds: torch.Tensor, ) -> torch.Tensor: # NOTE(woosuk): To avoid CPU-GPU synchronization without CPU knowing the # number of rejected tokens, we maintain the size of eagle's input_ids and @@ -74,13 +76,13 @@ def propose( last_hidden_states, hidden_states = self.run_model( num_tokens, input_batch.attn_metadata, + input_batch.slot_mappings, num_tokens_across_dp=None, # FIXME ) sample_hidden_states = last_hidden_states[last_token_indices] logits = self.model.compute_logits(sample_hidden_states) num_reqs = input_batch.num_reqs - cu_num_logits = input_batch.cu_num_logits[:num_reqs] # NOTE(woosuk): For draft sampling, we only consider the temperature # and ignore the other sampling parameters such as top_k and top_p, # for simplicity and performance. @@ -89,16 +91,23 @@ def propose( # NOTE(Ronald1995): torch.gather will pollute the cache such as self.input_buffers.positions # the bug is reported to huawei CANN team, but not fixed yet. # So we clone the tensors before calling torch.gather to avoid the issue. - temperature = self.temperature[:num_reqs].clone() - seeds = self.seeds[:num_reqs].clone() + idx_mapping = self.idx_mapping[:num_reqs] + idx_mapping.copy_(input_batch.idx_mapping) + self.temperature.copy_(temperature) + self.seeds.copy_(seeds) pos = self.input_buffers.positions[:num_reqs].clone() # Gather the values and copy them to the pre-allocated buffers. - torch.gather(sampling_metadata.temperature, 0, cu_num_logits, out=temperature) - torch.gather(sampling_metadata.seeds, 0, cu_num_logits, out=seeds) torch.gather(input_batch.positions, 0, last_token_indices, out=pos) # NOTE(woosuk): We must add 1 to the positions to match the Gumbel noise # used for draft and target sampling. - draft_tokens = gumbel_sample(logits, temperature, seeds, pos + 1, apply_temperature=True) + draft_tokens = gumbel_sample( + logits, + idx_mapping, + self.temperature, + self.seeds, + pos + 1, + apply_temperature=True, + ) if self.num_speculative_steps == 1: # Early exit. return draft_tokens.view(-1, 1) @@ -117,9 +126,12 @@ def propose( self.max_model_len, self.max_num_reqs, ) - query_start_loc = self.input_buffers.query_start_loc - query_start_loc_gpu = query_start_loc.gpu[: num_reqs + 1] - slot_mappings = self.block_tables.compute_slot_mappings(query_start_loc_gpu, pos) + query_start_loc = self.input_buffers.query_start_loc[: num_reqs + 1] + slot_mappings = self.block_tables.compute_slot_mappings( + idx_mapping, + query_start_loc, + pos, + ) cudagraph_size = self.cudagraph_manager.get_cudagraph_size(num_reqs) if cudagraph_size is not None: @@ -128,10 +140,8 @@ def propose( return self.draft_tokens[:num_reqs] # Run eager mode. - query_start_loc.np[: num_reqs + 1] = np.arange(num_reqs + 1) - query_start_loc_cpu = query_start_loc.cpu[: num_reqs + 1] + query_start_loc_cpu = torch.arange(num_reqs + 1, dtype=torch.int32, device="cpu") # HACK(woosuk) - seq_lens_np = np.full(num_reqs, self.max_model_len, dtype=np.int32) block_tables = [x[:num_reqs] for x in self.block_tables.input_block_tables] # FIXME(woosuk): This is UNSAFE!! @@ -139,16 +149,22 @@ def propose( attn_metadata_builders=self.attn_metadata_builders, num_reqs=num_reqs, num_tokens=num_reqs, - query_start_loc_gpu=query_start_loc_gpu, + query_start_loc_gpu=query_start_loc, query_start_loc_cpu=query_start_loc_cpu, + max_query_len=1, seq_lens=self.input_buffers.seq_lens[:num_reqs], - seq_lens_np=seq_lens_np, - num_computed_tokens_cpu=None, # FIXME + max_seq_len=self.max_model_len, block_tables=block_tables, slot_mappings=slot_mappings, kv_cache_config=self.kv_cache_config, ) - self.generate_draft(num_reqs, attn_metadata, num_tokens_across_dp=None) # FIXME + slot_mappings_by_layer = build_slot_mappings_by_layer(slot_mappings, self.kv_cache_config) + self.generate_draft( + num_reqs, + attn_metadata, + slot_mappings_by_layer, + num_tokens_across_dp=None, + ) # FIXME return self.draft_tokens[:num_reqs] diff --git a/vllm_ascend/patch/worker/patch_v2_uva.py b/vllm_ascend/patch/worker/patch_v2_uva.py new file mode 100644 index 00000000..613059f4 --- /dev/null +++ b/vllm_ascend/patch/worker/patch_v2_uva.py @@ -0,0 +1,125 @@ +# Adapt from https://github.com/vllm-project/vllm/blob/main/vllm/v1/worker/gpu/block_table.py +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project +# Copyright (c) 2025 Huawei Technologies Co., Ltd. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# This file is a part of the vllm-ascend project. +# +from collections.abc import Callable, Sequence + +import numpy as np +import torch +import vllm.v1.worker.gpu.buffer_utils + + +def get_row_indices_from_key(key: int | slice | tuple, dim_size: int) -> set[int]: + """get the set of row indices involved in the given key.""" + if isinstance(key, int): + # parse index such as np[1] + key = key if key >= 0 else dim_size + key + # handle negative index + if key < 0 or key >= dim_size: + raise IndexError(f"row index {key} out of [0, {dim_size})") + return {key} + elif isinstance(key, slice): + # parse slice such as np[1:3] + start, stop, step = key.indices(dim_size) + return set(range(start, stop, step)) + elif isinstance(key, tuple): + # parse row slice such as np[1,:100] + if len(key) == 0: + return set(range(dim_size)) + return get_row_indices_from_key(key[0], dim_size) + else: + # for other types such as list/ndarray, we return all rows. + return set(range(dim_size)) + + +class MonitoredNumPyArray: + """A wrapper around a NumPy array that monitors modifications.""" + + def __init__(self, array: np.ndarray, callback: Callable): + self._array = array + self._callback = callback + + def __setitem__(self, key, value): + self._array[key] = value + dim_size = self._array.shape[0] + row_indices = get_row_indices_from_key(key, dim_size) + for row in row_indices: + self._callback(row) + + def __getitem__(self, key): + return self._array[key] + + def __getattr__(self, name): + return getattr(self._array, name) + + +class MonitoredTorchTensor: + """A wrapper around a torch tensor that monitors modifications.""" + + def __init__(self, tensor: torch.Tensor, callback: Callable): + self._tensor = tensor + self._callback = callback + + def __setitem__(self, key, value): + self._tensor[key] = value + dim_size = self._tensor.size(0) + row_indices = get_row_indices_from_key(key, dim_size) + for row in row_indices: + self._callback(row) + + def __getitem__(self, key): + return self._tensor[key] + + def __getattr__(self, name): + return getattr(self._tensor, name) + + +class UvaBufferWrapper: + """Ascend NPU doesn't support UVA tensors directly. This is a wrapper class + that provides CPU and NPU views of a UVA tensor.""" + + def __init__(self, size: int | Sequence[int], dtype: torch.dtype): + self._cpu: torch.Tensor = torch.zeros(size, dtype=dtype, device="cpu", pin_memory=True) + self._np = self._cpu.numpy() + self._uva: torch.Tensor = torch.zeros_like(self._cpu, device="npu") + self._modified_indices: set[int] = set() + + def _mark_cpu_modified(self, key: int): + self._modified_indices.add(key) + + @property + def cpu(self): + return MonitoredTorchTensor(self._cpu, self._mark_cpu_modified) + + @property + def np(self): + return MonitoredNumPyArray(self._np, self._mark_cpu_modified) + + @property + def uva(self): + """Get the device data of the buffer.""" + if self._modified_indices: + # Sort for better memory access locality + dirty_rows = sorted(self._modified_indices) + # can't use copy_ method, because copy_ for index tensor + # will malloc new memory. + self._uva[dirty_rows] = self._cpu[dirty_rows].to(device="npu", non_blocking=True) + self._modified_indices.clear() + return self._uva + + +vllm.v1.worker.gpu.buffer_utils.UvaBuffer = UvaBufferWrapper diff --git a/vllm_ascend/platform.py b/vllm_ascend/platform.py index da3ca2f7..ac7fdead 100644 --- a/vllm_ascend/platform.py +++ b/vllm_ascend/platform.py @@ -551,7 +551,7 @@ class NPUPlatform(Platform): vllm_config: VllmConfig, dp_metadata, virtual_engine: int = 0, - num_tokens: int | None = None, + num_tokens: int = 0, num_tokens_across_dp: torch.Tensor | None = None, cudagraph_runtime_mode=None, batch_descriptor=None, @@ -601,10 +601,6 @@ class NPUPlatform(Platform): if not envs_vllm.VLLM_USE_V2_MODEL_RUNNER: return {} - num_actual_tokens = list(attn_metadata.values())[0].num_actual_tokens - if num_tokens is None: - num_tokens = num_actual_tokens - moe_comm_type = select_moe_comm_method( num_tokens, vllm_config, @@ -636,10 +632,13 @@ class NPUPlatform(Platform): # TODO(Levi-JQ): another PR to normalize the enabling logic for sp/fc2 flashcomm_v2_enabled = flashcomm2_enable() and tp_world_size > 1 and num_tokens is not None - pad_size = 0 + pad_size = None + padded_length = None if sp_enabled or flashcomm_v2_enabled: pad_size = (tp_world_size - (num_tokens % tp_world_size)) % tp_world_size + if num_tokens is None and attn_metadata is not None: + num_tokens = list(attn_metadata.values())[0].num_actual_tokens dp_world_size = get_dp_group().world_size if dp_world_size > 1 and dp_metadata is not None: max_tokens_across_dp = dp_metadata.max_tokens_across_dp_cpu.item() @@ -648,8 +647,9 @@ class NPUPlatform(Platform): pad_size = padded_length - num_tokens else: max_tokens_across_dp = num_tokens - + mc2_mask = None if num_tokens is not None: + num_actual_tokens = num_tokens # NOTE: token num which need to pad to when mc2 padded_num_tokens = math.ceil(max_tokens_across_dp / tp_world_size) * tp_world_size reserved_mc2_mask = get_mc2_mask() diff --git a/vllm_ascend/worker/v2/README.md b/vllm_ascend/worker/v2/README.md index 3a83901d..976372aa 100644 --- a/vllm_ascend/worker/v2/README.md +++ b/vllm_ascend/worker/v2/README.md @@ -4,3 +4,5 @@ This directory contains the new model runner which is under active development. please see [Model Runner V2](https://github.com/vllm-project/vllm-ascend/issues/5208) to get specific plans. + +supported vllm version: main@1339784 diff --git a/vllm_ascend/worker/v2/aclgraph_utils.py b/vllm_ascend/worker/v2/aclgraph_utils.py index dc63f7b0..bb37653c 100644 --- a/vllm_ascend/worker/v2/aclgraph_utils.py +++ b/vllm_ascend/worker/v2/aclgraph_utils.py @@ -35,9 +35,14 @@ from vllm_ascend.worker.v2.utils import torch_cuda_wrapper class AclGraphManager(CudaGraphManager): """ACL Graph Manager for Ascend NPUs.""" - def __init__(self, vllm_config: VllmConfig, device: torch.device): + def __init__( + self, + vllm_config: VllmConfig, + use_mrope: bool, + device: torch.device, + ): with torch_cuda_wrapper(): - super().__init__(vllm_config, device) + super().__init__(vllm_config, use_mrope, device) def capture_graph( self, diff --git a/vllm_ascend/worker/v2/attn_utils.py b/vllm_ascend/worker/v2/attn_utils.py index aa4ddb75..46a92094 100644 --- a/vllm_ascend/worker/v2/attn_utils.py +++ b/vllm_ascend/worker/v2/attn_utils.py @@ -42,17 +42,21 @@ def get_attn_mask_builder(device: torch.device): def build_attn_metadata( + *, attn_metadata_builders: list[AttentionMetadataBuilder], num_reqs: int, num_tokens: int, query_start_loc_gpu: torch.Tensor, query_start_loc_cpu: torch.Tensor, + max_query_len: int, seq_lens: torch.Tensor, - seq_lens_np: np.ndarray, - num_computed_tokens_cpu: torch.Tensor | None, + max_seq_len: int, block_tables: Sequence[torch.Tensor], slot_mappings: torch.Tensor, kv_cache_config: KVCacheConfig, + # extra attributes for ascend npus. + seq_lens_np: np.ndarray | None = None, + num_computed_tokens_cpu: torch.Tensor | None = None, positions: torch.Tensor | None = None, attn_state: Any | None = None, graph_pad_size: int = -1, @@ -61,9 +65,13 @@ def build_attn_metadata( ) -> dict[str, Any]: """Build attention metadata for Ascend NPUs.""" # TODO(Ronald1995): optimize AscendCommonAttentionMetadata. - max_query_len = int(query_start_loc_cpu.max()) - seq_lens_cpu = torch.from_numpy(seq_lens_np) - max_seq_len = int(seq_lens_cpu.max()) + + # seq_lens_np is used for ascend npus, it maybe None in spec_decode case, + # we fill it with max_seq_len in case `attn_metadata_builder.build` raise + # an error. + if seq_lens_np is None: + seq_lens_np = np.full(num_reqs, max_seq_len, dtype=np.int32) + seq_lens_cpu = torch.from_numpy(seq_lens_np)[:num_reqs] # torch_npu._reshape_and_cache operator requires slot_mappings to # be torch.int32. slot_mappings = slot_mappings.to(torch.int32) @@ -77,7 +85,7 @@ def build_attn_metadata( common_attn_metadata = AscendCommonAttentionMetadata( query_start_loc=query_start_loc_gpu, query_start_loc_cpu=query_start_loc_cpu, - seq_lens_cpu=seq_lens_cpu[:num_reqs], + seq_lens_cpu=seq_lens_cpu, seq_lens=seq_lens[:num_reqs], num_reqs=num_reqs, num_actual_tokens=num_tokens, diff --git a/vllm_ascend/worker/v2/input_batch.py b/vllm_ascend/worker/v2/input_batch.py index e3b87cf2..31e5d90e 100644 --- a/vllm_ascend/worker/v2/input_batch.py +++ b/vllm_ascend/worker/v2/input_batch.py @@ -16,10 +16,11 @@ # limitations under the License. # This file is a part of the vllm-ascend project. # +from dataclasses import asdict, dataclass import numpy as np import torch -from vllm.v1.worker.gpu.input_batch import InputBuffers +from vllm.v1.worker.gpu.input_batch import InputBatch, InputBuffers class AscendInputBuffers(InputBuffers): @@ -29,20 +30,12 @@ class AscendInputBuffers(InputBuffers): self, max_num_reqs: int, max_num_tokens: int, - inputs_embeds_size: int, - vocab_size: int, - dtype: torch.dtype, device: torch.device, - pin_memory: bool, ): super().__init__( max_num_reqs, max_num_tokens, - inputs_embeds_size, - vocab_size, - dtype, device, - pin_memory, ) # Create seq_lens_cpu and seq_lens_np. # npu's attention backend still needs seq_lens on CPU side. @@ -54,3 +47,36 @@ class AscendInputBuffers(InputBuffers): # seq_len_np and seq_lens_cpu share the same memory. # define seq_lens_np for easier calculation with numpy. self.seq_lens_np: np.ndarray = self.seq_lens_cpu.numpy() + + +@dataclass +class AscendInputBatch(InputBatch): + """Input batch for Ascend NPUs.""" + + # Create seq_lens_np. + # npu's attention backend still needs seq_lens on CPU side. + seq_lens_np: np.ndarray + + @classmethod + def make_dummy( + cls, + num_reqs: int, + num_tokens: int, + input_buffers: AscendInputBuffers, + device: torch.device, + ) -> "AscendInputBatch": + """Override the make_dummy method to calculate seq_lens_np.""" + input_batch = InputBatch.make_dummy( + num_reqs, + num_tokens, + input_buffers, + device, + ) + # seq_len equals to query_len + input_buffers.seq_lens_np[:num_reqs] = num_tokens // num_reqs + input_buffers.seq_lens_np[num_reqs - 1] += num_tokens % num_reqs + # Pad for full CUDA graph mode. + input_buffers.seq_lens_np[num_reqs:] = 0 + seq_lens_np = input_buffers.seq_lens_np[:num_reqs] + input_batch.seq_lens_np = seq_lens_np + return cls(**asdict(input_batch), seq_lens_np=seq_lens_np) diff --git a/vllm_ascend/worker/v2/model_runner.py b/vllm_ascend/worker/v2/model_runner.py index 2b5abce5..88188d29 100644 --- a/vllm_ascend/worker/v2/model_runner.py +++ b/vllm_ascend/worker/v2/model_runner.py @@ -22,9 +22,11 @@ import torch from vllm.config import VllmConfig from vllm.logger import init_logger from vllm.v1.core.sched.output import SchedulerOutput +from vllm.v1.worker.gpu.attn_utils import build_slot_mappings_by_layer +from vllm.v1.worker.gpu.buffer_utils import async_copy_to_gpu from vllm.v1.worker.gpu.input_batch import ( - InputBatch, combine_sampled_and_draft_tokens, + expand_idx_mapping, prepare_pos_seq_lens, prepare_prefill_inputs, ) @@ -32,11 +34,11 @@ from vllm.v1.worker.gpu.model_runner import GPUModelRunner from vllm_ascend.worker.v2.aclgraph_utils import AclGraphManager from vllm_ascend.worker.v2.attn_utils import build_attn_metadata, build_attn_state -from vllm_ascend.worker.v2.input_batch import AscendInputBuffers +from vllm_ascend.worker.v2.input_batch import AscendInputBatch, AscendInputBuffers from vllm_ascend.worker.v2.sample.sampler import AscendSampler from vllm_ascend.worker.v2.spec_decode import init_speculator from vllm_ascend.worker.v2.spec_decode.eagle import AscendEagleSpeculator -from vllm_ascend.worker.v2.states import AscendRequestState, uva_wrapper +from vllm_ascend.worker.v2.states import AscendRequestState from vllm_ascend.worker.v2.utils import torch_cuda_wrapper logger = init_logger(__name__) @@ -46,7 +48,7 @@ class NPUModelRunner(GPUModelRunner): """Model runner for Ascend NPUs.""" def __init__(self, vllm_config: VllmConfig, device: torch.device): - with torch_cuda_wrapper(), uva_wrapper(): + with torch_cuda_wrapper(): super().__init__(vllm_config, device) # because we will override these attribute, delete these attribute to @@ -59,8 +61,9 @@ class NPUModelRunner(GPUModelRunner): # NPU specific initializations can be added below. self.cudagraph_manager: AclGraphManager = AclGraphManager( - vllm_config, - device, + self.vllm_config, + self.uses_mrope, + self.device, ) # we define AscendEagleSpeculator in vllm_ascend.worker.v2.spec_decode.eagle @@ -79,23 +82,22 @@ class NPUModelRunner(GPUModelRunner): num_speculative_steps=self.num_speculative_steps, vocab_size=self.vocab_size, device=self.device, - pin_memory=self.pin_memory, ) # AscendInputBuffers has extra `seq_lens_cpu` attribute. # so reinitialize input_buffers here. self.input_buffers: AscendInputBuffers = AscendInputBuffers( max_num_reqs=self.max_num_reqs, max_num_tokens=self.max_num_tokens, - inputs_embeds_size=self.inputs_embeds_size, - vocab_size=self.vocab_size, - dtype=self.dtype, device=self.device, - pin_memory=self.pin_memory, ) # we need to adjust triton operators in sampler, # so reinitialize sampler here. self.sampler: AscendSampler = AscendSampler( + max_num_reqs=self.max_num_reqs, + vocab_size=self.vocab_size, + device=self.device, logprobs_mode=self.model_config.logprobs_mode, + num_speculative_tokens=self.num_speculative_steps + 1, ) # we need to copy num_computed_tokens back to cpu to help @@ -108,32 +110,31 @@ class NPUModelRunner(GPUModelRunner): self.max_num_reqs, dtype=torch.int32, device="cpu", - pin_memory=self.pin_memory, + pin_memory=True, ) def prepare_inputs( self, scheduler_output: SchedulerOutput, num_tokens_after_padding: int, - ) -> InputBatch: + ) -> AscendInputBatch: """Override GPUModelRunner.prepare_inputs for Ascend NPUs. npu attention backends need seq_lens_cpu to work. so we need to prepare seq_lens_cpu here. """ num_tokens = scheduler_output.total_num_scheduled_tokens assert num_tokens > 0 - num_reqs = len(scheduler_output.num_scheduled_tokens) + num_tokens_per_req = scheduler_output.num_scheduled_tokens + num_reqs = len(num_tokens_per_req) # Decode first, then prefill. # batch_idx -> req_id - req_ids = sorted( - scheduler_output.num_scheduled_tokens.keys(), - key=lambda k: scheduler_output.num_scheduled_tokens[k], - ) + req_ids = sorted(num_tokens_per_req, key=num_tokens_per_req.get) # type: ignore self._update_seq_lens_cpu(scheduler_output, req_ids) - num_scheduled_tokens = np.array([scheduler_output.num_scheduled_tokens[i] for i in req_ids], dtype=np.int32) + numtoks_iter = map(num_tokens_per_req.get, req_ids) + num_scheduled_tokens = np.fromiter(numtoks_iter, dtype=np.int32, count=num_reqs) num_valid_tokens = num_scheduled_tokens if scheduler_output.scheduled_spec_decode_tokens: num_valid_tokens = np.array( @@ -150,81 +151,94 @@ class NPUModelRunner(GPUModelRunner): num_scheduled_tokens, num_valid_tokens, ) - - idx_mapping_list = [self.req_states.req_id_to_index[req_id] for req_id in req_ids] - idx_mapping = self.input_buffers.idx_mapping - idx_mapping.np[:num_reqs] = idx_mapping_list - idx_mapping_np = idx_mapping.np[:num_reqs] - idx_mapping_cpu = idx_mapping.cpu[:num_reqs] - idx_mapping_npu = idx_mapping.copy_to_gpu(num_reqs) + idx_mapping_iter = map(self.req_states.req_id_to_index.get, req_ids) + idx_mapping_np = np.fromiter(idx_mapping_iter, dtype=np.int32, count=num_reqs) + idx_mapping_cpu = torch.from_numpy(idx_mapping_np) + idx_mapping = async_copy_to_gpu(idx_mapping_cpu, device=self.device) # Get the number of draft tokens for each request. - if not scheduler_output.scheduled_spec_decode_tokens: + draft_tokens = scheduler_output.scheduled_spec_decode_tokens + if not draft_tokens: # No draft token scheduled (common case). total_num_draft_tokens = 0 total_num_logits = num_reqs + cu_num_logits_np = np.arange(num_reqs + 1, dtype=np.int32) cu_num_logits = torch.arange(num_reqs + 1, device=self.device, dtype=torch.int32) + expanded_idx_mapping = idx_mapping + expanded_local_pos = torch.zeros(num_reqs, dtype=torch.int32, device=self.device) else: - draft_tokens = scheduler_output.scheduled_spec_decode_tokens num_draft_tokens = np.array( - [len(draft_tokens[req_id]) if req_id in draft_tokens else 0 for req_id in req_ids], + [len(draft_tokens.get(req_id, ())) for req_id in req_ids], dtype=np.int32, ) total_num_draft_tokens = int(num_draft_tokens.sum()) total_num_logits = num_reqs + total_num_draft_tokens - np.cumsum( - num_draft_tokens + 1, - out=self.input_buffers.cu_num_logits.np[1 : num_reqs + 1], + num_logits = num_draft_tokens + 1 + cu_num_logits_np = np.empty(num_reqs + 1, dtype=np.int32) + cu_num_logits_np[0] = 0 + np.cumsum(num_logits, out=cu_num_logits_np[1:]) + cu_num_logits = async_copy_to_gpu(cu_num_logits_np, device=self.device) + + max_expand_len = self.num_speculative_steps + 1 + expanded_idx_mapping, expanded_local_pos = expand_idx_mapping( + idx_mapping, total_num_logits, cu_num_logits, max_expand_len ) - cu_num_logits = self.input_buffers.cu_num_logits.copy_to_gpu(num_reqs + 1) # Block tables: num_kv_cache_groups x [num_reqs, max_num_blocks] - block_tables = self.block_tables.gather_block_tables(idx_mapping_npu) + block_tables = self.block_tables.gather_block_tables(idx_mapping) # Get query_start_loc. - np.cumsum( - num_scheduled_tokens, - out=self.input_buffers.query_start_loc.np[1 : num_reqs + 1], - ) + query_start_loc_np = np.empty(self.max_num_reqs + 1, dtype=np.int32) + query_start_loc_np[0] = 0 + np.cumsum(num_scheduled_tokens, out=query_start_loc_np[1 : num_reqs + 1]) # Pad for full CUDA graph mode. # Some attention backends like FA3 require query_start_loc to be non-decreasing. - self.input_buffers.query_start_loc.np[num_reqs + 1 :] = num_tokens - self.input_buffers.query_start_loc.copy_to_gpu() - query_start_loc_gpu = self.input_buffers.query_start_loc.gpu[: num_reqs + 1] - query_start_loc_cpu = self.input_buffers.query_start_loc.cpu[: num_reqs + 1] - query_start_loc_np = self.input_buffers.query_start_loc.np[: num_reqs + 1] + query_start_loc_np[num_reqs + 1 :] = num_tokens + async_copy_to_gpu(query_start_loc_np, out=self.input_buffers.query_start_loc) + + query_start_loc_np = query_start_loc_np[: num_reqs + 1] + query_start_loc_cpu = torch.from_numpy(query_start_loc_np) + query_start_loc = self.input_buffers.query_start_loc[: num_reqs + 1] + max_query_len = num_scheduled_tokens.max().item() # Get prefill tokens. prepare_prefill_inputs( self.input_buffers.input_ids, self.req_states.next_prefill_tokens, - idx_mapping_npu, - query_start_loc_gpu, - # use prefill_token_ids.copy_to_gpu() because npu doesn't - # support uva buffer. - self.req_states.prefill_token_ids.copy_to_gpu(), + idx_mapping, + query_start_loc, + self.req_states.prefill_token_ids.gpu, self.req_states.prefill_len.gpu, - self.req_states.num_computed_tokens, + self.req_states.num_computed_tokens.gpu, ) # Prepare positions and seq_lens. prepare_pos_seq_lens( - idx_mapping_npu, - query_start_loc_gpu, - self.req_states.num_computed_tokens, + idx_mapping, + query_start_loc, + self.req_states.num_computed_tokens.gpu, self.input_buffers.positions, self.input_buffers.seq_lens, ) seq_lens = self.input_buffers.seq_lens[:num_reqs] + # Prepare M-RoPE positions. + if self.uses_mrope: + self.mrope_states.prepare_mrope_positions( + idx_mapping, + query_start_loc, + self.req_states.prefill_len.gpu, + self.req_states.num_computed_tokens.gpu, + ) + # Some input token ids are directly read from the last sampled tokens # and draft tokens. Also, get the logits indices to sample tokens from. logits_indices = combine_sampled_and_draft_tokens( self.input_buffers.input_ids, - idx_mapping_npu, + idx_mapping, self.req_states.last_sampled_tokens, - query_start_loc_gpu, + query_start_loc, seq_lens, self.req_states.prefill_len.gpu, self.req_states.draft_tokens, @@ -234,9 +248,10 @@ class NPUModelRunner(GPUModelRunner): # Compute slot mappings: [num_kv_cache_groups, num_tokens] slot_mappings = self.block_tables.compute_slot_mappings( - query_start_loc_gpu, self.input_buffers.positions[:num_tokens] + idx_mapping, query_start_loc, self.input_buffers.positions[:num_tokens] ) - + # Layer name -> slot mapping. + slot_mappings_by_layer = build_slot_mappings_by_layer(slot_mappings, self.kv_cache_config) # Layer name -> attention metadata. # TODO(Ronald1995): try to add a new method `build_attn_metadata` in # vllm gpu_model_runner_v2, maybe we don't overwrite `prepare_inputs` @@ -245,37 +260,51 @@ class NPUModelRunner(GPUModelRunner): attn_metadata_builders=self.attn_metadata_builders, num_reqs=num_reqs, num_tokens=num_tokens, - query_start_loc_gpu=query_start_loc_gpu, + query_start_loc_gpu=query_start_loc, query_start_loc_cpu=query_start_loc_cpu, + max_query_len=max_query_len, seq_lens=self.input_buffers.seq_lens, - seq_lens_np=self.input_buffers.seq_lens_np, - num_computed_tokens_cpu=self.req_states.num_computed_tokens_cpu[idx_mapping_cpu], + max_seq_len=self.max_model_len, block_tables=block_tables, slot_mappings=slot_mappings, kv_cache_config=self.kv_cache_config, + # extra attributes for ascend npus. + seq_lens_np=self.input_buffers.seq_lens_np, + num_computed_tokens_cpu=self.req_states.num_computed_tokens_cpu[idx_mapping_cpu], attn_state=attn_state, ) input_ids = self.input_buffers.input_ids[:num_tokens_after_padding] positions = self.input_buffers.positions[:num_tokens_after_padding] - return InputBatch( + mrope_positions = None + if self.uses_mrope: + mrope_positions = self.mrope_states.mrope_positions + mrope_positions = mrope_positions[:, :num_tokens_after_padding] + return AscendInputBatch( req_ids=req_ids, num_reqs=num_reqs, - idx_mapping=idx_mapping_npu, + idx_mapping=idx_mapping, idx_mapping_np=idx_mapping_np, + expanded_idx_mapping=expanded_idx_mapping, + expanded_local_pos=expanded_local_pos, num_scheduled_tokens=num_scheduled_tokens, num_tokens=num_tokens, num_tokens_after_padding=num_tokens_after_padding, num_draft_tokens=total_num_draft_tokens, - query_start_loc=query_start_loc_gpu, + query_start_loc=query_start_loc, query_start_loc_np=query_start_loc_np, seq_lens=seq_lens, - seq_lens_np=self.input_buffers.seq_lens_np, input_ids=input_ids, positions=positions, + mrope_positions=mrope_positions, + inputs_embeds=None, attn_metadata=attn_metadata, + slot_mappings=slot_mappings_by_layer, logits_indices=logits_indices, cu_num_logits=cu_num_logits, + cu_num_logits_np=cu_num_logits_np, + has_structured_output_reqs=scheduler_output.has_structured_output_requests, + seq_lens_np=self.input_buffers.seq_lens_np, ) def postprocess( @@ -303,7 +332,7 @@ class NPUModelRunner(GPUModelRunner): with torch.npu.stream(self.num_computed_tokens_stream): self.num_computed_tokens_stream.wait_stream(default_stream) self.num_computed_tokens_cpu.copy_( - self.req_states.num_computed_tokens, + self.req_states.num_computed_tokens.gpu, non_blocking=True, ) self.num_computed_tokens_event.record() diff --git a/vllm_ascend/worker/v2/sample/gumbel.py b/vllm_ascend/worker/v2/sample/gumbel.py index fdcdacdd..ad1715da 100644 --- a/vllm_ascend/worker/v2/sample/gumbel.py +++ b/vllm_ascend/worker/v2/sample/gumbel.py @@ -30,6 +30,7 @@ def _gumbel_sample_kernel( local_max_stride, logits_ptr, logits_stride, + idx_mapping_ptr, seeds_ptr, pos_ptr, temp_ptr, @@ -37,24 +38,26 @@ def _gumbel_sample_kernel( BLOCK_SIZE: tl.constexpr, APPLY_TEMPERATURE: tl.constexpr, ): - req_idx = tl.program_id(0) + batch_idx = tl.program_id(0) + req_state_idx = tl.load(idx_mapping_ptr + batch_idx) + block_idx = tl.program_id(1) block = block_idx * BLOCK_SIZE + tl.arange(0, BLOCK_SIZE) mask = block < vocab_size logits = tl.load( - logits_ptr + req_idx * logits_stride + block, + logits_ptr + batch_idx * logits_stride + block, mask=mask, other=float("-inf"), ) logits = logits.to(tl.float32) - temp = tl.load(temp_ptr + req_idx).to(tl.float32) + temp = tl.load(temp_ptr + req_state_idx).to(tl.float32) if temp != 0.0: # Calculate the seed for gumbel noise. - seed = tl.load(seeds_ptr + req_idx) + seed = tl.load(seeds_ptr + req_state_idx) # NOTE(Ronald1995): change pos's dtype to tl.int32, because triton-ascend's # compiler doesn't support unint64 of pos arg. - pos = tl.load(pos_ptr + req_idx).to(tl.int32) + pos = tl.load(pos_ptr + batch_idx).to(tl.int32) gumbel_seed = tl.randint(seed, pos) # Generate gumbel noise. @@ -66,7 +69,7 @@ def _gumbel_sample_kernel( # Apply temperature. if APPLY_TEMPERATURE: - # NOTE(woosuk): Match the behavior of _penalties_and_temperature_kernel. + # NOTE(woosuk): Match the behavior of _temperature_kernel. # E.g., if the kernel uses tl.div_rn, we should use tl.div_rn here too. logits = logits / temp @@ -76,21 +79,18 @@ def _gumbel_sample_kernel( idx = tl.argmax(logits, axis=0) token_id = block_idx * BLOCK_SIZE + idx value = tl.max(logits, axis=0) - tl.store(local_argmax_ptr + req_idx * local_argmax_stride + block_idx, token_id) - tl.store(local_max_ptr + req_idx * local_max_stride + block_idx, value) + tl.store(local_argmax_ptr + batch_idx * local_argmax_stride + block_idx, token_id) + tl.store(local_max_ptr + batch_idx * local_max_stride + block_idx, value) def gumbel_sample( logits: torch.Tensor, # [num_reqs, vocab_size] + idx_mapping: torch.Tensor, # [num_reqs] temperature: torch.Tensor, # [num_reqs] seed: torch.Tensor, # [num_reqs] pos: torch.Tensor, # [num_reqs] apply_temperature: bool, ) -> torch.Tensor: - """Override the function because there are some bugs - when _gumbel_sample_kernel runs on npu, we need to make some fixes. - you could read NOTE(Ronald1995) comments to understand. - """ num_reqs, vocab_size = logits.shape BLOCK_SIZE = 1024 num_blocks = triton.cdiv(vocab_size, BLOCK_SIZE) @@ -114,6 +114,7 @@ def gumbel_sample( local_max.stride(0), logits, logits.stride(0), + idx_mapping, seed, pos, temperature, diff --git a/vllm_ascend/worker/v2/sample/sampler.py b/vllm_ascend/worker/v2/sample/sampler.py index 610b5e75..4bbb0fa3 100644 --- a/vllm_ascend/worker/v2/sample/sampler.py +++ b/vllm_ascend/worker/v2/sample/sampler.py @@ -14,22 +14,25 @@ # limitations under the License. # This file is a part of the vllm-ascend project. # - +import numpy as np import torch -from vllm.v1.sample.metadata import SamplingMetadata from vllm.v1.sample.ops.topk_topp_sampler import apply_top_k_top_p +from vllm.v1.worker.gpu.sample.gumbel import apply_temperature from vllm.v1.worker.gpu.sample.min_p import apply_min_p from vllm.v1.worker.gpu.sample.sampler import Sampler from vllm_ascend.worker.v2.sample.gumbel import gumbel_sample -from vllm_ascend.worker.v2.sample.penalties import apply_penalties_and_temperature class AscendSampler(Sampler): def sample( self, logits: torch.Tensor, - sampling_metadata: SamplingMetadata, + idx_mapping: torch.Tensor, + idx_mapping_np: np.ndarray, + pos: torch.Tensor, + input_ids: torch.Tensor, + expanded_local_pos: torch.Tensor, ) -> tuple[torch.Tensor, torch.Tensor]: """Override sample method because we need to override triton operators called in the method. @@ -37,19 +40,42 @@ class AscendSampler(Sampler): # Copy logits to a new FP32 tensor. logits = torch.empty_like(logits, dtype=torch.float32).copy_(logits) - # Apply penalties and temperature in place. - apply_penalties_and_temperature(logits, sampling_metadata) - # Apply min_p in place. - if sampling_metadata.min_p is not None: - apply_min_p(logits, sampling_metadata.min_p) - # Apply top_k and/or top_p. This might return a new tensor. - logits = apply_top_k_top_p(logits, sampling_metadata.top_k, sampling_metadata.top_p) + # Apply logit bias (e.g., allowed_token_ids, min_tokens) in place. + self.logit_bias_state.apply_logit_bias(logits, idx_mapping, idx_mapping_np, pos) + # Apply penalties in place. + self.penalties_state.apply_penalties( + logits, + idx_mapping, + idx_mapping_np, + input_ids, + expanded_local_pos, + self.num_speculative_tokens, + ) + + # Apply temperature in place. + apply_temperature(logits, idx_mapping, self.sampling_states.temperature.gpu) + + # Apply min_p in place if any request has a non-zero min_p. + do_min_p = self.sampling_states.do_min_p(idx_mapping_np) + if do_min_p: + apply_min_p(logits, idx_mapping, self.sampling_states.min_p.gpu) + + # Apply top_k and/or top_p. This might return a new tensor. + do_top_k = self.sampling_states.do_top_k(idx_mapping_np) + top_k = self.sampling_states.top_k.gpu[idx_mapping] if do_top_k else None + do_top_p = self.sampling_states.do_top_p(idx_mapping_np) + top_p = self.sampling_states.top_p.gpu[idx_mapping] if do_top_p else None + if do_top_k or do_top_p: + logits = apply_top_k_top_p(logits, top_k, top_p) + + # Sample the next token. sampled = gumbel_sample( logits, - sampling_metadata.temperature, - sampling_metadata.seeds, - sampling_metadata.pos, + idx_mapping, + self.sampling_states.temperature.gpu, + self.sampling_states.seeds.gpu, + pos, apply_temperature=False, ) return sampled, logits diff --git a/vllm_ascend/worker/v2/spec_decode/eagle.py b/vllm_ascend/worker/v2/spec_decode/eagle.py index 81c5ae8d..c9ee8c9f 100644 --- a/vllm_ascend/worker/v2/spec_decode/eagle.py +++ b/vllm_ascend/worker/v2/spec_decode/eagle.py @@ -42,14 +42,23 @@ class AscendEagleSpeculator(EagleSpeculator): def propose( self, - input_batch, - sampling_metadata, - last_hidden_states, - aux_hidden_states, - num_sampled, - num_rejected, - last_sampled, - next_prefill_tokens, + input_batch: InputBatch, + # [num_tokens, hidden_size] + last_hidden_states: torch.Tensor, + # num_layers x [num_tokens, hidden_size] + aux_hidden_states: list[torch.Tensor] | None, + # [num_reqs] + num_sampled: torch.Tensor, + # [num_reqs] + num_rejected: torch.Tensor, + # [max_num_reqs] + last_sampled: torch.Tensor, + # [max_num_reqs] + next_prefill_tokens: torch.Tensor, + # [max_num_reqs] + temperature: torch.Tensor, + # [max_num_reqs] + seeds: torch.Tensor, ): """Override GPU EagleSpeculator.propose for Ascend NPUs, because npu attention metadata needs more information, @@ -62,19 +71,21 @@ class AscendEagleSpeculator(EagleSpeculator): with build_attn_metadata_wrapper(): return super().propose( input_batch, - sampling_metadata, last_hidden_states, aux_hidden_states, num_sampled, num_rejected, last_sampled, next_prefill_tokens, + temperature, + seeds, ) def generate_draft( self, num_reqs: int, attn_metadata: dict[str, Any], + slot_mappings: dict[str, torch.Tensor], num_tokens_across_dp, ): """Override GPU EagleSpeculator.generate_draft for Ascend NPUs, because @@ -86,6 +97,7 @@ class AscendEagleSpeculator(EagleSpeculator): return super().generate_draft( num_reqs, attn_metadata, + slot_mappings, num_tokens_across_dp, ) @@ -94,6 +106,7 @@ class AscendEagleSpeculator(EagleSpeculator): self, num_tokens: int, attn_metadata: dict[str, Any], + slot_mappings: dict[str, torch.Tensor] | None, num_tokens_across_dp: torch.Tensor | None, ) -> tuple[torch.Tensor, torch.Tensor]: """Override GPU EagleSpeculator.run_model for Ascend NPUs, because @@ -103,6 +116,7 @@ class AscendEagleSpeculator(EagleSpeculator): last_hidden_states, hidden_states = super().run_model( num_tokens, attn_metadata, + slot_mappings, num_tokens_across_dp, ) diff --git a/vllm_ascend/worker/v2/states.py b/vllm_ascend/worker/v2/states.py index 84d35fad..f1c24702 100644 --- a/vllm_ascend/worker/v2/states.py +++ b/vllm_ascend/worker/v2/states.py @@ -17,11 +17,7 @@ # This file is a part of the vllm-ascend project. # -from contextlib import contextmanager - import torch -import vllm -from vllm.v1.utils import CpuGpuBuffer from vllm.v1.worker.gpu.states import RequestState @@ -36,7 +32,6 @@ class AscendRequestState(RequestState): num_speculative_steps: int, vocab_size: int, device: torch.device, - pin_memory: bool, ): super().__init__( max_num_reqs, @@ -45,11 +40,7 @@ class AscendRequestState(RequestState): num_speculative_steps, vocab_size, device, - pin_memory, ) - # because we will override these attribute, delete these attribute to - # make sure it's collected by python gc immediately. - del self.prefill_token_ids # vllm gpu_model_runner_v2 deprecate the seqs_lens_cpu attribute, # because they think most attention backends do not need it. # However, Ascend attention backend muse uses seqs_lens_cpu, @@ -60,11 +51,6 @@ class AscendRequestState(RequestState): dtype=torch.int32, device="cpu", ) - # NOTE(Ronald1995): Ascend NPUs do not support UVA yet, - # so we use CpuGpuBuffer to allocate prefill_token_ids buffer. - self.prefill_token_ids: CpuGpuBuffer = self._make_buffer( # type: ignore - (self.max_num_reqs, self.max_model_len), dtype=torch.int32 - ) def add_request( self, @@ -72,32 +58,12 @@ class AscendRequestState(RequestState): prompt_len, prefill_token_ids, num_computed_tokens, - sampling_params, - lora_request, ): super().add_request( req_id, prompt_len, prefill_token_ids, num_computed_tokens, - sampling_params, - lora_request, ) req_idx = self.req_id_to_index[req_id] self.num_computed_tokens_cpu[req_idx] = num_computed_tokens - - -@contextmanager -def uva_wrapper(): - """Context manager to disable UVA for Ascend NPUs.""" - - class UvaBufferWrapper: - def __init__(self, *args, **kwargs): - pass - - try: - # TODO(Ronald1995): rectify this when NPU support uva. - vllm.v1.worker.gpu.states.UvaBuffer = UvaBufferWrapper - yield - finally: - pass