Files
xc-llm-ascend/tests/ut/quantization/test_w4a16.py
linfeng-yuan 88d03a783f [refactor] replace scattered business kwargs with typed request objects and explicit stage boundaries (#7024)
### What this PR does / why we need it?
Refactor `vllm_ascend/ops/fused_moe` to replace scattered MoE business
`**kwargs` with typed request objects and explicit stage boundaries.

- Prepare, dispatch, MLP, and quant stages now have clearer ownership.
- Main MoE path no longer depends on business `kwargs.get(...)` lookups.
- Comm and dispatcher interfaces are request-only on the main path.
- UTs can assert stage-level fields directly instead of inferring
behavior indirectly.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
CI passed.

---------

Signed-off-by: linfeng-yuan <1102311262@qq.com>
2026-03-20 23:23:57 +08:00

308 lines
13 KiB
Python

from unittest.mock import Mock, patch
import torch
from tests.ut.base import TestBase
from vllm_ascend.ascend_forward_context import MoECommType
from vllm_ascend.quantization.methods.w4a16 import AscendW4A16FusedMoEMethod, pack_to_int32, unpack_from_int32
class TestUnpackFromInt32(TestBase):
def test_unpack_from_int32_packed_dim_1(self):
weight = torch.tensor([[305419896, -1420531520]], dtype=torch.int32)
shape = torch.Size([1, 8])
num_bits = 4
result = unpack_from_int32(weight, shape, num_bits, packed_dim=1)
self.assertEqual(result.dtype, torch.int8)
self.assertEqual(result.shape, shape)
def test_unpack_from_int32_packed_dim_0(self):
weight = torch.tensor([[305419896], [-1420531520]], dtype=torch.int32)
shape = torch.Size([8, 1])
num_bits = 4
result = unpack_from_int32(weight, shape, num_bits, packed_dim=0)
self.assertEqual(result.dtype, torch.int8)
self.assertEqual(result.shape, shape)
def test_unpack_from_int32_assertions(self):
with self.assertRaises(AssertionError):
weight = torch.tensor([[1, 2]], dtype=torch.int64)
unpack_from_int32(weight, torch.Size([8, 1]), 4)
with self.assertRaises(AssertionError):
weight = torch.tensor([[1, 2]], dtype=torch.int32)
unpack_from_int32(weight, torch.Size([8, 1]), 16)
class TestPackToInt32(TestBase):
@patch(
"vllm_ascend.quantization.methods.w4a16.torch_npu.npu_convert_weight_to_int4pack"
)
def test_pack_to_int32_int8(self, mock_npu_convert_weight_to_int4pack):
mock_npu_convert_weight_to_int4pack.return_value = torch.zeros(
(2, 4), dtype=torch.int32)
weight = torch.zeros((2, 8, 16), dtype=torch.int8)
result = pack_to_int32(weight)
self.assertEqual(result.dtype, torch.int32)
mock_npu_convert_weight_to_int4pack.assert_not_called()
self.assertEqual(result.shape, torch.Size([2, 8, 4]))
@patch(
"vllm_ascend.quantization.methods.w4a16.torch_npu.npu_convert_weight_to_int4pack"
)
def test_pack_to_int32_int32(self, mock_npu_convert_weight_to_int4pack):
def mock_convert_weight(weight):
return weight
mock_npu_convert_weight_to_int4pack.side_effect = mock_convert_weight
weight = torch.zeros((2, 8, 8), dtype=torch.int32)
result = pack_to_int32(weight)
self.assertEqual(result.dtype, torch.int32)
self.assertEqual(result.shape, weight.shape)
def test_pack_to_int32_assertion_dim(self):
with self.assertRaises(AssertionError):
weight = torch.zeros((8, 8), dtype=torch.int8)
pack_to_int32(weight)
def test_pack_to_int32_assertion_dtype(self):
with self.assertRaises(AssertionError):
weight = torch.zeros((2, 8, 8), dtype=torch.float32)
pack_to_int32(weight)
def test_pack_to_int32_assertion_divisible(self):
with self.assertRaises(AssertionError):
weight = torch.zeros((2, 8, 7), dtype=torch.int32)
pack_to_int32(weight)
with self.assertRaises(AssertionError):
weight = torch.zeros((2, 8, 7), dtype=torch.int8)
pack_to_int32(weight)
class TestAscendW4A16FusedMoEMethod(TestBase):
experts = 8
input_size = 32
output_size = 128
group_size = 32
@patch("vllm_ascend.quantization.methods.w4a16.get_ascend_config")
@patch("vllm_ascend.quantization.methods.w4a16.get_current_vllm_config")
def setUp(self, mock_get_current_vllm_config, mock_get_ascend_config):
mock_ascend_config = Mock()
mock_ascend_config.eplb_config.dynamic_eplb = False
mock_ascend_config.eplb_config.expert_map_record_path = None
mock_get_ascend_config.return_value = mock_ascend_config
mock_vllm_config = Mock()
mock_vllm_config.quant_config = Mock(quant_description={
"group_size": self.group_size,
})
mock_get_current_vllm_config.return_value = mock_vllm_config
self.quant_method = AscendW4A16FusedMoEMethod()
def test_init(self):
self.assertTrue(self.quant_method.transpose_weight)
self.assertEqual(self.quant_method.num_bits, 4)
self.assertEqual(self.quant_method.pack_factor, 8)
self.assertEqual(self.quant_method.group_size, self.group_size)
self.assertFalse(self.quant_method.dynamic_eplb)
def test_get_weight(self):
param_dict = self.quant_method.get_weight(self.experts,
self.input_size,
self.output_size,
torch.bfloat16)
self.assertEqual(param_dict["w13_weight_packed"].dtype, torch.int32)
expected_w13_shape = (self.experts, 2 * self.input_size,
self.output_size //
self.quant_method.pack_factor)
self.assertEqual(param_dict["w13_weight_packed"].shape,
expected_w13_shape)
self.assertEqual(param_dict["w2_weight_packed"].dtype, torch.int32)
expected_w2_shape = (self.experts, self.output_size,
self.input_size // self.quant_method.pack_factor)
self.assertEqual(param_dict["w2_weight_packed"].shape,
expected_w2_shape)
def test_get_dynamic_quant_param(self):
param_dict = self.quant_method.get_dynamic_quant_param(
self.experts, self.input_size, self.output_size, torch.bfloat16)
self.assertEqual(param_dict["w13_weight_scale"].dtype, torch.bfloat16)
expected_w13_scale_shape = (self.experts, 2 * self.input_size,
self.output_size // self.group_size)
self.assertEqual(param_dict["w13_weight_scale"].shape,
expected_w13_scale_shape)
self.assertEqual(param_dict["w2_weight_scale"].dtype, torch.bfloat16)
expected_w2_scale_shape = (self.experts, self.output_size,
self.input_size // self.group_size)
self.assertEqual(param_dict["w2_weight_scale"].shape,
expected_w2_scale_shape)
self.assertEqual(param_dict["w13_weight_shape"].dtype, torch.int32)
self.assertEqual(param_dict["w13_weight_shape"].shape,
(self.experts, 2))
self.assertEqual(param_dict["w2_weight_shape"].dtype, torch.int32)
self.assertEqual(param_dict["w2_weight_shape"].shape,
(self.experts, 2))
self.assertEqual(param_dict["w13_weight_offset"].dtype, torch.bfloat16)
self.assertEqual(param_dict["w13_weight_offset"].shape,
expected_w13_scale_shape)
self.assertEqual(param_dict["w2_weight_offset"].dtype, torch.bfloat16)
self.assertEqual(param_dict["w2_weight_offset"].shape,
expected_w2_scale_shape)
def build_layer(self):
"""Build a mock layer for testing"""
layer = torch.nn.Module()
w13_shape = (self.experts, 2 * self.input_size,
self.output_size // self.quant_method.pack_factor)
w2_shape = (self.experts, self.output_size,
self.input_size // self.quant_method.pack_factor)
layer.w13_weight_packed = torch.nn.Parameter(torch.randint(
-100, 100, w13_shape, dtype=torch.int32),
requires_grad=False)
layer.w2_weight_packed = torch.nn.Parameter(torch.randint(
-100, 100, w2_shape, dtype=torch.int32),
requires_grad=False)
w13_scale_shape = (self.experts, 2 * self.input_size,
self.output_size // self.group_size)
w2_scale_shape = (self.experts, self.output_size,
self.input_size // self.group_size)
layer.w13_weight_scale = torch.nn.Parameter(torch.ones(
w13_scale_shape, dtype=torch.bfloat16),
requires_grad=False)
layer.w2_weight_scale = torch.nn.Parameter(torch.ones(
w2_scale_shape, dtype=torch.bfloat16),
requires_grad=False)
layer.w13_weight_offset = torch.nn.Parameter(torch.zeros(
w13_scale_shape, dtype=torch.bfloat16),
requires_grad=False)
layer.w2_weight_offset = torch.nn.Parameter(torch.zeros(
w2_scale_shape, dtype=torch.bfloat16),
requires_grad=False)
layer.w13_weight_shape = torch.nn.Parameter(torch.tensor(
[[2 * self.input_size, self.output_size]] * self.experts,
dtype=torch.int32),
requires_grad=False)
layer.w2_weight_shape = torch.nn.Parameter(torch.tensor(
[[self.output_size, self.input_size]] * self.experts,
dtype=torch.int32),
requires_grad=False)
return layer
@patch(
"vllm_ascend.quantization.methods.w4a16.torch_npu.npu_convert_weight_to_int4pack"
)
def test_process_weights_after_loading_with_transpose(
self, mock_npu_convert_weight_to_int4pack):
def mock_convert_weight(weight):
new_shape = list(weight.shape)
new_shape[-1] = new_shape[-1] // 8
return torch.zeros(new_shape, dtype=torch.int32)
mock_npu_convert_weight_to_int4pack.side_effect = mock_convert_weight
layer = self.build_layer()
self.quant_method.transpose_weight = True
self.quant_method.process_weights_after_loading(layer)
self.assertEqual(layer.w13_weight_packed.data.shape,
torch.Size([8, 128, 8]))
self.assertEqual(layer.w2_weight_packed.data.shape,
torch.Size([8, 32, 16]))
self.assertEqual(layer.w13_weight_scale.data.shape,
torch.Size([8, 4, 64]))
self.assertEqual(layer.w2_weight_scale.data.shape,
torch.Size([8, 1, 128]))
self.assertEqual(layer.w13_weight_offset.data.shape,
torch.Size([8, 4, 64]))
self.assertEqual(layer.w2_weight_offset.data.shape,
torch.Size([8, 1, 128]))
self.assertTrue(layer.w13_weight_scale.data.is_contiguous())
self.assertTrue(layer.w2_weight_scale.data.is_contiguous())
self.assertTrue(layer.w13_weight_offset.data.is_contiguous())
self.assertTrue(layer.w2_weight_offset.data.is_contiguous())
def test_process_weights_after_loading_without_transpose(self):
layer = self.build_layer()
self.quant_method.transpose_weight = False
original_w13_data = layer.w13_weight_packed.data.clone()
original_w2_data = layer.w2_weight_packed.data.clone()
self.quant_method.process_weights_after_loading(layer)
self.assertTrue(
torch.equal(layer.w13_weight_packed.data, original_w13_data))
self.assertTrue(
torch.equal(layer.w2_weight_packed.data, original_w2_data))
@patch("vllm_ascend.quantization.methods.w4a16._EXTRA_CTX")
@patch("vllm_ascend.quantization.methods.w4a16.select_experts")
def test_apply_uses_explicit_dispatch_and_mlp_args(self, mock_select_experts, mock_extra_ctx):
tokens = 3
hidden_size = self.output_size
layer = self.build_layer()
x = torch.randn(tokens, hidden_size, dtype=torch.float32)
router_logits = torch.randn(tokens, self.experts, dtype=torch.float32)
topk_weights = torch.randn(tokens, 2, dtype=torch.float32)
topk_ids = torch.randint(0, self.experts, (tokens, 2), dtype=torch.int64)
mc2_mask = torch.tensor([1, 0, 1], dtype=torch.bool)
pertoken_scale = torch.randn(tokens, dtype=torch.float32)
mock_select_experts.return_value = (topk_weights, topk_ids)
mock_comm = Mock()
mock_comm.fused_experts.return_value = torch.randn(tokens, hidden_size, dtype=torch.float32)
mock_extra_ctx.moe_comm_method = mock_comm
mock_extra_ctx.moe_comm_type = MoECommType.ALLGATHER
self.quant_method.apply(
layer=layer,
x=x,
router_logits=router_logits,
top_k=2,
renormalize=True,
global_num_experts=self.experts,
activation="gelu",
apply_router_weight_on_input=True,
mc2_mask=mc2_mask,
pertoken_scale=pertoken_scale,
)
fused_experts_input = mock_comm.fused_experts.call_args.kwargs["fused_experts_input"]
self.assertEqual(fused_experts_input.activation, "gelu")
self.assertTrue(fused_experts_input.routing.apply_router_weight_on_input)
self.assertIs(fused_experts_input.routing.mc2_mask, mc2_mask)
self.assertIs(fused_experts_input.routing.pertoken_scale, pertoken_scale)