### What this PR does / why we need it?
**Scope of Changes**:
| File Path |
| :--- |
|` vllm_ascend/quantization/compressed_tensors/compressed_tensors.py`|
|` vllm_ascend/quantization/quant_config.py`|
|` vllm_ascend/quantization/utils.py`|
|` vllm_ascend/quantization/w4a16.py`|
|` vllm_ascend/quantization/w4a4_flatquant_dynamic.py`|
|` vllm_ascend/quantization/w4a8_dynamic.py`|
|` vllm_ascend/quantization/w8a16.py`|
|` vllm_ascend/quantization/w8a8.py`|
|` vllm_ascend/quantization/w8a8_dynamic.py`|
|` vllm_ascend/quantization/w8a8_pdmix.py`|
|` vllm_ascend/quantization/w8a8mxfp8.py`|
|` vllm_ascend/sample/rejection_sampler.py`|
|` vllm_ascend/sample/sampler.py`|
|` vllm_ascend/worker/block_table.py`|
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
- vLLM version: v0.13.0
- vLLM main:
2c24bc6996
Signed-off-by: MrZ20 <2609716663@qq.com>
This commit is contained in:
@@ -1,5 +1,3 @@
|
||||
from typing import Optional, Union
|
||||
|
||||
import numpy as np
|
||||
import torch
|
||||
from vllm.distributed import get_dcp_group, get_pcp_group
|
||||
@@ -8,17 +6,18 @@ from vllm.v1.utils import CpuGpuBuffer
|
||||
|
||||
|
||||
class BlockTable:
|
||||
|
||||
def __init__(self,
|
||||
block_size: int,
|
||||
max_num_reqs: int,
|
||||
max_num_blocks_per_req: int,
|
||||
max_num_batched_tokens: int,
|
||||
pin_memory: bool,
|
||||
device: torch.device,
|
||||
kernel_sizes: Union[list[int], None] = None,
|
||||
cp_kv_cache_interleave_size: int = 1,
|
||||
num_speculative_tokens: int = 0):
|
||||
def __init__(
|
||||
self,
|
||||
block_size: int,
|
||||
max_num_reqs: int,
|
||||
max_num_blocks_per_req: int,
|
||||
max_num_batched_tokens: int,
|
||||
pin_memory: bool,
|
||||
device: torch.device,
|
||||
kernel_sizes: list[int] | None = None,
|
||||
cp_kv_cache_interleave_size: int = 1,
|
||||
num_speculative_tokens: int = 0,
|
||||
):
|
||||
self.max_num_reqs = max_num_reqs
|
||||
self.max_num_blocks_per_req = max_num_blocks_per_req
|
||||
self.max_num_batched_tokens = max_num_batched_tokens
|
||||
@@ -28,8 +27,7 @@ class BlockTable:
|
||||
|
||||
try:
|
||||
self.pcp_world_size = get_pcp_group().world_size
|
||||
self.pcp_rank = get_pcp_group(
|
||||
).rank_in_group if self.pcp_world_size > 1 else 0
|
||||
self.pcp_rank = get_pcp_group().rank_in_group if self.pcp_world_size > 1 else 0
|
||||
self.dcp_world_size = get_dcp_group().world_size
|
||||
self.dcp_rank = get_dcp_group().rank_in_group
|
||||
except AssertionError:
|
||||
@@ -49,42 +47,37 @@ class BlockTable:
|
||||
# Find the first kernel size that divides physical_block_size evenly
|
||||
selected_kernel_size = None
|
||||
for kernel_size in kernel_sizes:
|
||||
if kernel_size > 0 \
|
||||
and self.physical_block_size % kernel_size == 0:
|
||||
if kernel_size > 0 and self.physical_block_size % kernel_size == 0:
|
||||
selected_kernel_size = kernel_size
|
||||
break
|
||||
|
||||
if selected_kernel_size is None:
|
||||
raise ValueError(
|
||||
f"None of the kernel sizes {kernel_sizes} can divide "
|
||||
f"physical block size {self.physical_block_size} evenly")
|
||||
f"physical block size {self.physical_block_size} evenly"
|
||||
)
|
||||
|
||||
self.block_size = selected_kernel_size
|
||||
self.logical_block_size = selected_kernel_size
|
||||
self.blocks_per_phys_block = (self.physical_block_size //
|
||||
self.logical_block_size)
|
||||
self.blocks_per_phys_block = self.physical_block_size // self.logical_block_size
|
||||
if self.blocks_per_phys_block > 1:
|
||||
self.use_hybrid_blocks = True
|
||||
else:
|
||||
self.use_hybrid_blocks = False
|
||||
|
||||
if self.use_hybrid_blocks:
|
||||
logical_table_size = (max_num_blocks_per_req *
|
||||
self.blocks_per_phys_block)
|
||||
logical_table_size = max_num_blocks_per_req * self.blocks_per_phys_block
|
||||
else:
|
||||
logical_table_size = max_num_blocks_per_req
|
||||
|
||||
duplicate_size = 1
|
||||
if self.pcp_world_size * self.dcp_world_size > 1:
|
||||
duplicate_size += num_speculative_tokens
|
||||
self.block_table = self._make_buffer(max_num_reqs * duplicate_size,
|
||||
logical_table_size,
|
||||
dtype=torch.int32)
|
||||
self.block_table = self._make_buffer(max_num_reqs * duplicate_size, logical_table_size, dtype=torch.int32)
|
||||
self.num_blocks_per_row = np.zeros(max_num_reqs, dtype=np.int32)
|
||||
self.slot_mapping = self._make_buffer(
|
||||
self.max_num_batched_tokens +
|
||||
2 * self.pcp_world_size * self.max_num_reqs,
|
||||
dtype=torch.int32)
|
||||
self.max_num_batched_tokens + 2 * self.pcp_world_size * self.max_num_reqs, dtype=torch.int32
|
||||
)
|
||||
|
||||
self.kernel_sizes = kernel_sizes
|
||||
self.cp_kv_cache_interleave_size = cp_kv_cache_interleave_size
|
||||
@@ -103,7 +96,7 @@ class BlockTable:
|
||||
num_blocks = len(block_ids)
|
||||
start = self.num_blocks_per_row[row_idx]
|
||||
|
||||
self.block_table.np[row_idx, start:start + num_blocks] = block_ids
|
||||
self.block_table.np[row_idx, start : start + num_blocks] = block_ids
|
||||
self.num_blocks_per_row[row_idx] += num_blocks
|
||||
|
||||
def add_row(self, block_ids: list[int], row_idx: int) -> None:
|
||||
@@ -112,8 +105,7 @@ class BlockTable:
|
||||
|
||||
def move_row(self, src: int, tgt: int) -> None:
|
||||
num_blocks = self.num_blocks_per_row[src]
|
||||
self.block_table.np[tgt, :num_blocks] = self.block_table.np[
|
||||
src, :num_blocks]
|
||||
self.block_table.np[tgt, :num_blocks] = self.block_table.np[src, :num_blocks]
|
||||
self.num_blocks_per_row[tgt] = num_blocks
|
||||
|
||||
def swap_row(self, src: int, tgt: int) -> None:
|
||||
@@ -124,8 +116,7 @@ class BlockTable:
|
||||
|
||||
self.block_table.np[[src, tgt]] = self.block_table.np[[tgt, src]]
|
||||
|
||||
def compute_slot_mapping(self, req_indices: np.ndarray,
|
||||
positions: np.ndarray) -> None:
|
||||
def compute_slot_mapping(self, req_indices: np.ndarray, positions: np.ndarray) -> None:
|
||||
# E.g., [0, 1, 0, 1, 2, 3, 4, 0, 1, 2]
|
||||
# -> [0, 0, K, K, K + 1, K + 1, K + 2, 2 * K, 2 * K, 2 * K + 1]
|
||||
# where K is the max_num_blocks_per_req and the block size is 2.
|
||||
@@ -150,27 +141,30 @@ class BlockTable:
|
||||
# (always needed with unified tensor)
|
||||
# Each physical block is split into multiple logical blocks
|
||||
# The logical table has been expanded to accommodate this
|
||||
block_table_indices = (req_indices * self.max_num_blocks_per_req *
|
||||
self.blocks_per_phys_block +
|
||||
logical_block_idx)
|
||||
block_table_indices = (
|
||||
req_indices * self.max_num_blocks_per_req * self.blocks_per_phys_block + logical_block_idx
|
||||
)
|
||||
|
||||
block_numbers = self.block_table.np.ravel()[block_table_indices]
|
||||
# Use virtual_block_size for mask calculation, which marks local
|
||||
# tokens.
|
||||
virtual_block_offsets = positions % virtual_block_size
|
||||
self.current_rank = self.dcp_world_size * self.pcp_rank + self.dcp_rank
|
||||
mask = (virtual_block_offsets // self.cp_kv_cache_interleave_size %
|
||||
(self.dcp_world_size *
|
||||
self.pcp_world_size) == self.current_rank)
|
||||
mask = (
|
||||
virtual_block_offsets // self.cp_kv_cache_interleave_size % (self.dcp_world_size * self.pcp_world_size)
|
||||
== self.current_rank
|
||||
)
|
||||
# Calculate local block_offsets
|
||||
block_offsets = virtual_block_offsets \
|
||||
// (self.dcp_world_size * self.pcp_world_size * self.cp_kv_cache_interleave_size) \
|
||||
* self.cp_kv_cache_interleave_size + virtual_block_offsets % self.cp_kv_cache_interleave_size
|
||||
block_offsets = (
|
||||
virtual_block_offsets
|
||||
// (self.dcp_world_size * self.pcp_world_size * self.cp_kv_cache_interleave_size)
|
||||
* self.cp_kv_cache_interleave_size
|
||||
+ virtual_block_offsets % self.cp_kv_cache_interleave_size
|
||||
)
|
||||
# Calculate slot_mapping
|
||||
slot_mapping = block_numbers * self.block_size + block_offsets
|
||||
# Write final slots, use -1 for not-local
|
||||
self.slot_mapping.np[:req_indices.shape[0]] = np.where(
|
||||
mask, slot_mapping, -1)
|
||||
self.slot_mapping.np[: req_indices.shape[0]] = np.where(mask, slot_mapping, -1)
|
||||
else:
|
||||
assert self.kernel_sizes is not None
|
||||
if self.block_size == self.kernel_sizes[0]:
|
||||
@@ -183,15 +177,12 @@ class BlockTable:
|
||||
# Each physical block is split into multiple logical blocks
|
||||
# The logical table has been expanded to accommodate this
|
||||
block_table_indices = (
|
||||
req_indices * self.max_num_blocks_per_req *
|
||||
self.blocks_per_phys_block + logical_block_idx)
|
||||
req_indices * self.max_num_blocks_per_req * self.blocks_per_phys_block + logical_block_idx
|
||||
)
|
||||
|
||||
block_numbers = self.block_table.np.ravel(
|
||||
)[block_table_indices]
|
||||
block_numbers = self.block_table.np.ravel()[block_table_indices]
|
||||
block_offsets = positions % self.block_size
|
||||
np.add(block_numbers * self.block_size,
|
||||
block_offsets,
|
||||
out=self.slot_mapping.np[:req_indices.shape[0]])
|
||||
np.add(block_numbers * self.block_size, block_offsets, out=self.slot_mapping.np[: req_indices.shape[0]])
|
||||
|
||||
def commit_block_table(self, num_reqs: int) -> None:
|
||||
self.block_table.copy_to_gpu(num_reqs)
|
||||
@@ -203,8 +194,7 @@ class BlockTable:
|
||||
self.block_table.fill_(0)
|
||||
self.block_table.cpu.fill_(0)
|
||||
|
||||
def _convert_physical_to_logical_blocks(
|
||||
self, physical_blocks: np.ndarray) -> np.ndarray:
|
||||
def _convert_physical_to_logical_blocks(self, physical_blocks: np.ndarray) -> np.ndarray:
|
||||
"""Convert physical block IDs to logical block IDs."""
|
||||
if not self.use_hybrid_blocks:
|
||||
return physical_blocks
|
||||
@@ -217,8 +207,7 @@ class BlockTable:
|
||||
# [1*split_ratio, 1*split_ratio+1, ...]
|
||||
# But we need to account for the fact that block 0 is special
|
||||
base_logical = phys_block * self.blocks_per_phys_block
|
||||
logical_blocks.extend(
|
||||
range(base_logical, base_logical + self.blocks_per_phys_block))
|
||||
logical_blocks.extend(range(base_logical, base_logical + self.blocks_per_phys_block))
|
||||
|
||||
return np.array(logical_blocks, dtype=np.int32)
|
||||
|
||||
@@ -234,27 +223,25 @@ class BlockTable:
|
||||
"""Returns the numpy array of the block table."""
|
||||
return self.block_table.np
|
||||
|
||||
def _make_buffer(self, *size: int | torch.SymInt,
|
||||
dtype: torch.dtype) -> CpuGpuBuffer:
|
||||
return CpuGpuBuffer(*size,
|
||||
dtype=dtype,
|
||||
device=self.device,
|
||||
pin_memory=self.pin_memory)
|
||||
def _make_buffer(self, *size: int | torch.SymInt, dtype: torch.dtype) -> CpuGpuBuffer:
|
||||
return CpuGpuBuffer(*size, dtype=dtype, device=self.device, pin_memory=self.pin_memory)
|
||||
|
||||
|
||||
class MultiGroupBlockTable:
|
||||
"""The BlockTables for each KV cache group."""
|
||||
|
||||
def __init__(self,
|
||||
max_num_reqs: int,
|
||||
max_model_len: int,
|
||||
max_num_batched_tokens: int,
|
||||
pin_memory: bool,
|
||||
device: torch.device,
|
||||
block_sizes: list[int],
|
||||
num_speculative_tokens: int = 0,
|
||||
kernel_sizes: Optional[list[list[int]]] = None,
|
||||
cp_kv_cache_interleave_size: int = 1) -> None:
|
||||
def __init__(
|
||||
self,
|
||||
max_num_reqs: int,
|
||||
max_model_len: int,
|
||||
max_num_batched_tokens: int,
|
||||
pin_memory: bool,
|
||||
device: torch.device,
|
||||
block_sizes: list[int],
|
||||
num_speculative_tokens: int = 0,
|
||||
kernel_sizes: list[list[int]] | None = None,
|
||||
cp_kv_cache_interleave_size: int = 1,
|
||||
) -> None:
|
||||
# Note(hc): each dcp rank only store
|
||||
# (max_model_len//dcp_world_size) tokens in kvcache,
|
||||
# so the block_size which used for calc max_num_blocks_per_req
|
||||
@@ -274,24 +261,26 @@ class MultiGroupBlockTable:
|
||||
kernel_sizes = kernel_sizes * len(block_sizes)
|
||||
elif len(kernel_sizes) != len(block_sizes):
|
||||
raise ValueError(
|
||||
f"kernel_sizes length ({len(kernel_sizes)}) must match "
|
||||
f"block_sizes length ({len(block_sizes)})")
|
||||
f"kernel_sizes length ({len(kernel_sizes)}) must match block_sizes length ({len(block_sizes)})"
|
||||
)
|
||||
|
||||
# Use zip to pair block_sizes with kernel_sizes one-to-one
|
||||
self.block_tables = [
|
||||
BlockTable(
|
||||
block_size, max_num_reqs,
|
||||
max(
|
||||
cdiv(max_model_len,
|
||||
block_size * dcp_world_size * pcp_world_size),
|
||||
1 + num_speculative_tokens), max_num_batched_tokens,
|
||||
pin_memory, device, kernel_size_list,
|
||||
cp_kv_cache_interleave_size, num_speculative_tokens)
|
||||
block_size,
|
||||
max_num_reqs,
|
||||
max(cdiv(max_model_len, block_size * dcp_world_size * pcp_world_size), 1 + num_speculative_tokens),
|
||||
max_num_batched_tokens,
|
||||
pin_memory,
|
||||
device,
|
||||
kernel_size_list,
|
||||
cp_kv_cache_interleave_size,
|
||||
num_speculative_tokens,
|
||||
)
|
||||
for block_size, kernel_size_list in zip(block_sizes, kernel_sizes)
|
||||
]
|
||||
|
||||
def append_row(self, block_ids: tuple[list[int], ...],
|
||||
row_idx: int) -> None:
|
||||
def append_row(self, block_ids: tuple[list[int], ...], row_idx: int) -> None:
|
||||
for i, block_table in enumerate(self.block_tables):
|
||||
block_table.append_row(block_ids[i], row_idx)
|
||||
|
||||
@@ -307,8 +296,7 @@ class MultiGroupBlockTable:
|
||||
for block_table in self.block_tables:
|
||||
block_table.swap_row(src, tgt)
|
||||
|
||||
def compute_slot_mapping(self, req_indices: np.ndarray,
|
||||
positions: np.ndarray) -> None:
|
||||
def compute_slot_mapping(self, req_indices: np.ndarray, positions: np.ndarray) -> None:
|
||||
for block_table in self.block_tables:
|
||||
block_table.compute_slot_mapping(req_indices, positions)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user