[Feature] adapt to uva buffer and main2main (#6657)
### What this PR does / why we need it?
vllm model runner v2 use uva buffer to prepare input data, but npu
doesn't support uva yet, this pr implement a uvawrapper class to mimic
gpu's uva backend. what's more, this pr make some modifications to adapt
to the newer main branch.
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
- vLLM main:
13397841ab
---------
Signed-off-by: Ronald1995 <ronaldautomobile@163.com>
This commit is contained in:
@@ -31,6 +31,7 @@ import vllm_ascend.patch.worker.patch_qwen3_next # noqa
|
||||
import vllm_ascend.patch.worker.patch_qwen3_next_mtp # noqa
|
||||
import vllm_ascend.patch.worker.patch_rejection_sampler # noqa
|
||||
import vllm_ascend.patch.worker.patch_qwen3_next # noqa
|
||||
import vllm_ascend.patch.worker.patch_v2_egale # noqa
|
||||
import vllm_ascend.patch.worker.patch_v2_eagle # noqa
|
||||
import vllm_ascend.patch.worker.patch_v2_uva # noqa
|
||||
import vllm_ascend.patch.worker.patch_huanyuan_vl # noqa
|
||||
import vllm_ascend.patch.worker.patch_npugraph_ex_triton # noqa
|
||||
|
||||
@@ -16,10 +16,9 @@
|
||||
# limitations under the License.
|
||||
# This file is a part of the vllm-ascend project.
|
||||
#
|
||||
import numpy as np
|
||||
import torch
|
||||
import vllm
|
||||
from vllm.v1.sample.metadata import SamplingMetadata
|
||||
from vllm.v1.worker.gpu.attn_utils import build_slot_mappings_by_layer
|
||||
from vllm.v1.worker.gpu.input_batch import InputBatch
|
||||
from vllm.v1.worker.gpu.sample.gumbel import gumbel_sample
|
||||
from vllm.v1.worker.gpu.spec_decode.eagle import prepare_eagle_decode, prepare_eagle_inputs
|
||||
@@ -31,7 +30,6 @@ from vllm_ascend.worker.v2.attn_utils import build_attn_metadata
|
||||
def propose(
|
||||
self,
|
||||
input_batch: InputBatch,
|
||||
sampling_metadata: SamplingMetadata,
|
||||
# [num_tokens, hidden_size]
|
||||
last_hidden_states: torch.Tensor,
|
||||
# num_layers x [num_tokens, hidden_size]
|
||||
@@ -40,10 +38,14 @@ def propose(
|
||||
num_sampled: torch.Tensor,
|
||||
# [num_reqs]
|
||||
num_rejected: torch.Tensor,
|
||||
# [num_reqs]
|
||||
# [max_num_reqs]
|
||||
last_sampled: torch.Tensor,
|
||||
# [num_reqs]
|
||||
# [max_num_reqs]
|
||||
next_prefill_tokens: torch.Tensor,
|
||||
# [max_num_reqs]
|
||||
temperature: torch.Tensor,
|
||||
# [max_num_reqs]
|
||||
seeds: torch.Tensor,
|
||||
) -> torch.Tensor:
|
||||
# NOTE(woosuk): To avoid CPU-GPU synchronization without CPU knowing the
|
||||
# number of rejected tokens, we maintain the size of eagle's input_ids and
|
||||
@@ -74,13 +76,13 @@ def propose(
|
||||
last_hidden_states, hidden_states = self.run_model(
|
||||
num_tokens,
|
||||
input_batch.attn_metadata,
|
||||
input_batch.slot_mappings,
|
||||
num_tokens_across_dp=None, # FIXME
|
||||
)
|
||||
sample_hidden_states = last_hidden_states[last_token_indices]
|
||||
logits = self.model.compute_logits(sample_hidden_states)
|
||||
|
||||
num_reqs = input_batch.num_reqs
|
||||
cu_num_logits = input_batch.cu_num_logits[:num_reqs]
|
||||
# NOTE(woosuk): For draft sampling, we only consider the temperature
|
||||
# and ignore the other sampling parameters such as top_k and top_p,
|
||||
# for simplicity and performance.
|
||||
@@ -89,16 +91,23 @@ def propose(
|
||||
# NOTE(Ronald1995): torch.gather will pollute the cache such as self.input_buffers.positions
|
||||
# the bug is reported to huawei CANN team, but not fixed yet.
|
||||
# So we clone the tensors before calling torch.gather to avoid the issue.
|
||||
temperature = self.temperature[:num_reqs].clone()
|
||||
seeds = self.seeds[:num_reqs].clone()
|
||||
idx_mapping = self.idx_mapping[:num_reqs]
|
||||
idx_mapping.copy_(input_batch.idx_mapping)
|
||||
self.temperature.copy_(temperature)
|
||||
self.seeds.copy_(seeds)
|
||||
pos = self.input_buffers.positions[:num_reqs].clone()
|
||||
# Gather the values and copy them to the pre-allocated buffers.
|
||||
torch.gather(sampling_metadata.temperature, 0, cu_num_logits, out=temperature)
|
||||
torch.gather(sampling_metadata.seeds, 0, cu_num_logits, out=seeds)
|
||||
torch.gather(input_batch.positions, 0, last_token_indices, out=pos)
|
||||
# NOTE(woosuk): We must add 1 to the positions to match the Gumbel noise
|
||||
# used for draft and target sampling.
|
||||
draft_tokens = gumbel_sample(logits, temperature, seeds, pos + 1, apply_temperature=True)
|
||||
draft_tokens = gumbel_sample(
|
||||
logits,
|
||||
idx_mapping,
|
||||
self.temperature,
|
||||
self.seeds,
|
||||
pos + 1,
|
||||
apply_temperature=True,
|
||||
)
|
||||
if self.num_speculative_steps == 1:
|
||||
# Early exit.
|
||||
return draft_tokens.view(-1, 1)
|
||||
@@ -117,9 +126,12 @@ def propose(
|
||||
self.max_model_len,
|
||||
self.max_num_reqs,
|
||||
)
|
||||
query_start_loc = self.input_buffers.query_start_loc
|
||||
query_start_loc_gpu = query_start_loc.gpu[: num_reqs + 1]
|
||||
slot_mappings = self.block_tables.compute_slot_mappings(query_start_loc_gpu, pos)
|
||||
query_start_loc = self.input_buffers.query_start_loc[: num_reqs + 1]
|
||||
slot_mappings = self.block_tables.compute_slot_mappings(
|
||||
idx_mapping,
|
||||
query_start_loc,
|
||||
pos,
|
||||
)
|
||||
|
||||
cudagraph_size = self.cudagraph_manager.get_cudagraph_size(num_reqs)
|
||||
if cudagraph_size is not None:
|
||||
@@ -128,10 +140,8 @@ def propose(
|
||||
return self.draft_tokens[:num_reqs]
|
||||
|
||||
# Run eager mode.
|
||||
query_start_loc.np[: num_reqs + 1] = np.arange(num_reqs + 1)
|
||||
query_start_loc_cpu = query_start_loc.cpu[: num_reqs + 1]
|
||||
query_start_loc_cpu = torch.arange(num_reqs + 1, dtype=torch.int32, device="cpu")
|
||||
# HACK(woosuk)
|
||||
seq_lens_np = np.full(num_reqs, self.max_model_len, dtype=np.int32)
|
||||
block_tables = [x[:num_reqs] for x in self.block_tables.input_block_tables]
|
||||
|
||||
# FIXME(woosuk): This is UNSAFE!!
|
||||
@@ -139,16 +149,22 @@ def propose(
|
||||
attn_metadata_builders=self.attn_metadata_builders,
|
||||
num_reqs=num_reqs,
|
||||
num_tokens=num_reqs,
|
||||
query_start_loc_gpu=query_start_loc_gpu,
|
||||
query_start_loc_gpu=query_start_loc,
|
||||
query_start_loc_cpu=query_start_loc_cpu,
|
||||
max_query_len=1,
|
||||
seq_lens=self.input_buffers.seq_lens[:num_reqs],
|
||||
seq_lens_np=seq_lens_np,
|
||||
num_computed_tokens_cpu=None, # FIXME
|
||||
max_seq_len=self.max_model_len,
|
||||
block_tables=block_tables,
|
||||
slot_mappings=slot_mappings,
|
||||
kv_cache_config=self.kv_cache_config,
|
||||
)
|
||||
self.generate_draft(num_reqs, attn_metadata, num_tokens_across_dp=None) # FIXME
|
||||
slot_mappings_by_layer = build_slot_mappings_by_layer(slot_mappings, self.kv_cache_config)
|
||||
self.generate_draft(
|
||||
num_reqs,
|
||||
attn_metadata,
|
||||
slot_mappings_by_layer,
|
||||
num_tokens_across_dp=None,
|
||||
) # FIXME
|
||||
return self.draft_tokens[:num_reqs]
|
||||
|
||||
|
||||
125
vllm_ascend/patch/worker/patch_v2_uva.py
Normal file
125
vllm_ascend/patch/worker/patch_v2_uva.py
Normal file
@@ -0,0 +1,125 @@
|
||||
# Adapt from https://github.com/vllm-project/vllm/blob/main/vllm/v1/worker/gpu/block_table.py
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
|
||||
# Copyright (c) 2025 Huawei Technologies Co., Ltd. All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
# This file is a part of the vllm-ascend project.
|
||||
#
|
||||
from collections.abc import Callable, Sequence
|
||||
|
||||
import numpy as np
|
||||
import torch
|
||||
import vllm.v1.worker.gpu.buffer_utils
|
||||
|
||||
|
||||
def get_row_indices_from_key(key: int | slice | tuple, dim_size: int) -> set[int]:
|
||||
"""get the set of row indices involved in the given key."""
|
||||
if isinstance(key, int):
|
||||
# parse index such as np[1]
|
||||
key = key if key >= 0 else dim_size + key
|
||||
# handle negative index
|
||||
if key < 0 or key >= dim_size:
|
||||
raise IndexError(f"row index {key} out of [0, {dim_size})")
|
||||
return {key}
|
||||
elif isinstance(key, slice):
|
||||
# parse slice such as np[1:3]
|
||||
start, stop, step = key.indices(dim_size)
|
||||
return set(range(start, stop, step))
|
||||
elif isinstance(key, tuple):
|
||||
# parse row slice such as np[1,:100]
|
||||
if len(key) == 0:
|
||||
return set(range(dim_size))
|
||||
return get_row_indices_from_key(key[0], dim_size)
|
||||
else:
|
||||
# for other types such as list/ndarray, we return all rows.
|
||||
return set(range(dim_size))
|
||||
|
||||
|
||||
class MonitoredNumPyArray:
|
||||
"""A wrapper around a NumPy array that monitors modifications."""
|
||||
|
||||
def __init__(self, array: np.ndarray, callback: Callable):
|
||||
self._array = array
|
||||
self._callback = callback
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
self._array[key] = value
|
||||
dim_size = self._array.shape[0]
|
||||
row_indices = get_row_indices_from_key(key, dim_size)
|
||||
for row in row_indices:
|
||||
self._callback(row)
|
||||
|
||||
def __getitem__(self, key):
|
||||
return self._array[key]
|
||||
|
||||
def __getattr__(self, name):
|
||||
return getattr(self._array, name)
|
||||
|
||||
|
||||
class MonitoredTorchTensor:
|
||||
"""A wrapper around a torch tensor that monitors modifications."""
|
||||
|
||||
def __init__(self, tensor: torch.Tensor, callback: Callable):
|
||||
self._tensor = tensor
|
||||
self._callback = callback
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
self._tensor[key] = value
|
||||
dim_size = self._tensor.size(0)
|
||||
row_indices = get_row_indices_from_key(key, dim_size)
|
||||
for row in row_indices:
|
||||
self._callback(row)
|
||||
|
||||
def __getitem__(self, key):
|
||||
return self._tensor[key]
|
||||
|
||||
def __getattr__(self, name):
|
||||
return getattr(self._tensor, name)
|
||||
|
||||
|
||||
class UvaBufferWrapper:
|
||||
"""Ascend NPU doesn't support UVA tensors directly. This is a wrapper class
|
||||
that provides CPU and NPU views of a UVA tensor."""
|
||||
|
||||
def __init__(self, size: int | Sequence[int], dtype: torch.dtype):
|
||||
self._cpu: torch.Tensor = torch.zeros(size, dtype=dtype, device="cpu", pin_memory=True)
|
||||
self._np = self._cpu.numpy()
|
||||
self._uva: torch.Tensor = torch.zeros_like(self._cpu, device="npu")
|
||||
self._modified_indices: set[int] = set()
|
||||
|
||||
def _mark_cpu_modified(self, key: int):
|
||||
self._modified_indices.add(key)
|
||||
|
||||
@property
|
||||
def cpu(self):
|
||||
return MonitoredTorchTensor(self._cpu, self._mark_cpu_modified)
|
||||
|
||||
@property
|
||||
def np(self):
|
||||
return MonitoredNumPyArray(self._np, self._mark_cpu_modified)
|
||||
|
||||
@property
|
||||
def uva(self):
|
||||
"""Get the device data of the buffer."""
|
||||
if self._modified_indices:
|
||||
# Sort for better memory access locality
|
||||
dirty_rows = sorted(self._modified_indices)
|
||||
# can't use copy_ method, because copy_ for index tensor
|
||||
# will malloc new memory.
|
||||
self._uva[dirty_rows] = self._cpu[dirty_rows].to(device="npu", non_blocking=True)
|
||||
self._modified_indices.clear()
|
||||
return self._uva
|
||||
|
||||
|
||||
vllm.v1.worker.gpu.buffer_utils.UvaBuffer = UvaBufferWrapper
|
||||
Reference in New Issue
Block a user