Files
xc-llm-ascend/tests/ut/quantization/test_quant_config.py
xuyexiong 26fc36b0e0 [V1] MTP supports torchair (#2145)
### What this PR does / why we need it?
Support MTP  with:

- [x]  V0 Scheduler
- [x]  TorchAir
- [x]  Single DP
- [x]  Multi DP
- [x]  Disaggregate PD

Known issues:
- [ ] Not support V1 Scheduler (chunked prefill), will be supported in a
few weeks
- [ ] vllm v0.10.0 does not support metrics with `DP > 1` right now,
need to comment out the line 171-175 in file
`vllm/vllm/v1/metrics/loggers.py`
```
            if (len(self.engine_indexes) > 1
                and vllm_config.speculative_config is not None):
            raise NotImplementedError("Prometheus metrics with Spec Decoding "
                                      "with >1 EngineCore per AsyncLLM is not "
                                      "supported yet.")
```

To start an online server with torchair enabled, here is an example:
```
python -m vllm.entrypoints.openai.api_server \
 --model="/weights/DeepSeek-R1_w8a8/" \
 --trust-remote-code \
 --max-model-len 40000 \
 --tensor-parallel-size 4 \
 --data_parallel_size 4 \
 --max-num-seqs 16 \
 --no-enable-prefix-caching \
 --enable_expert_parallel \
 --served-model-name deepseekr1 \
 --speculative-config '{"num_speculative_tokens": 1, "method":"deepseek_mtp"}' \
 --quantization ascend \
 --host 0.0.0.0 \
 --port 1234 \
 --additional-config '{"ascend_scheduler_config":{"enabled":true,"enable_chunked_prefill":false},"torchair_graph_config":{"enabled":true,"graph_batch_sizes":[16]},"enable_weight_nz_layout":true}' \
 --gpu_memory_utilization 0.9 
``` 

offline example with torchair enabled
```
from vllm import LLM, SamplingParams

prompts = [
    "Hello, my name is",
    "The president of the United States is",
    "The capital of France is",
    "The future of AI is",
]

# Create a sampling params object.
sampling_params = SamplingParams(max_tokens=16, temperature=0)
# Create an LLM.
llm = LLM(
    model="/home/data/DeepSeek-R1_w8a8/",
    tensor_parallel_size=16,
    max_num_seqs=16,
    gpu_memory_utilization=0.9,
    distributed_executor_backend="mp",
    enable_expert_parallel=True,
    speculative_config={
        "method": "deepseek_mtp",
        "num_speculative_tokens": 1,
    },
    trust_remote_code=True,
    enforce_eager=False,
    max_model_len=2000,
    additional_config = {
       'torchair_graph_config': {
            'enabled': True,
            "graph_batch_sizes": [16],
            'enable_multistream_shared_expert': False,
        },
       "ascend_scheduler_config": {
            "enabled": True
        },
        # 'expert_tensor_parallel_size': 16,
    }
)

# Generate texts from the prompts.
# llm.start_profile()
outputs = llm.generate(prompts, sampling_params)
# llm.stop_profile()
for output in outputs:
    prompt = output.prompt
    generated_text = output.outputs[0].text
    print(f"Prompt: {prompt!r}, Generated text: {generated_text!r}")
```

- vLLM version: v0.10.0
- vLLM main:
302962e806

---------

Signed-off-by: xuyexiong <xuyexiong@huawei.com>
2025-08-06 19:37:43 +08:00

233 lines
10 KiB
Python

from unittest.mock import MagicMock, patch
import torch
from vllm.attention.layer import Attention
from vllm.model_executor.layers.fused_moe import FusedMoE
from vllm.model_executor.layers.fused_moe.config import FusedMoEConfig
from vllm.model_executor.layers.linear import (LinearBase,
UnquantizedLinearMethod)
from tests.ut.base import TestBase
from vllm_ascend.quantization.quant_config import (AscendKVCacheMethod,
AscendQuantConfig)
ASCEND_QUATIZATION_METHOD = "ascend"
class TestAscendQuantConfig(TestBase):
def setUp(self):
self.sample_config = {
"weight": "INT8",
"fa_quant_type": "C8",
"kv_quant_type": "C8",
"layer1.weight": "INT8",
"layer2.weight": "FLOAT",
"fused_layer.weight": "FLOAT",
"fused_layer.shard1.weight": "FLOAT",
"fused_layer.shard2.weight": "FLOAT",
"shard1.weight": "FLOAT",
"shard2.weight": "FLOAT",
}
self.ascend_config = AscendQuantConfig(self.sample_config)
self.ascend_config.packed_modules_mapping = None
def test_init(self):
self.assertEqual(self.ascend_config.quant_description,
self.sample_config)
def test_repr(self):
repr_str = repr(self.ascend_config)
self.assertTrue(repr_str.startswith("AscendQuantConfig:\n"))
def test_get_name(self):
self.assertEqual(AscendQuantConfig.get_name(),
ASCEND_QUATIZATION_METHOD)
def test_get_supported_act_dtypes(self):
supported_dtypes = AscendQuantConfig.get_supported_act_dtypes()
self.assertEqual(len(supported_dtypes), 3)
def test_get_min_capability(self):
with self.assertRaises(NotImplementedError):
AscendQuantConfig.get_min_capability()
def test_get_config_filenames(self):
filenames = AscendQuantConfig.get_config_filenames()
self.assertEqual(filenames, ["quant_model_description.json"])
def test_from_config(self):
config = AscendQuantConfig.from_config(self.sample_config)
self.assertIsInstance(config, AscendQuantConfig)
self.assertEqual(config.quant_description, self.sample_config)
@patch('torch.npu.is_available')
def test_override_quantization_method(self, mock_is_available):
# Test when NPU is available
mock_is_available.return_value = True
result = AscendQuantConfig.override_quantization_method(None, None)
self.assertEqual(result, ASCEND_QUATIZATION_METHOD)
# Test when NPU is not available
mock_is_available.return_value = False
result = AscendQuantConfig.override_quantization_method(None, None)
self.assertIsNone(result)
def test_get_quant_method_for_linear(self):
linear_layer = MagicMock(spec=LinearBase)
# Test skipped layer
with patch.object(self.ascend_config,
'is_layer_skipped_ascend',
return_value=True):
method = self.ascend_config.get_quant_method(linear_layer, ".attn")
self.assertIsInstance(method, UnquantizedLinearMethod)
# Test quantized layer
with patch.object(self.ascend_config, 'is_layer_skipped_ascend', return_value=False), \
patch('vllm_ascend.quantization.quant_config.AscendLinearMethod', return_value=MagicMock()) as mock_ascend_linear:
method = self.ascend_config.get_quant_method(linear_layer, ".attn")
self.assertIs(method, mock_ascend_linear.return_value)
mock_ascend_linear.assert_called_once_with(
self.ascend_config, ".attn",
self.ascend_config.packed_modules_mapping)
def test_get_quant_method_for_attention(self):
attention_layer = MagicMock(spec=Attention)
with patch('vllm_ascend.quantization.quant_config.AscendKVCacheMethod',
return_value=MagicMock()) as mock_ascend_kvcache:
# Test with fa_quant_type
method = self.ascend_config.get_quant_method(
attention_layer, ".attn")
self.assertIs(method, mock_ascend_kvcache.return_value)
with patch('vllm_ascend.quantization.quant_config.AscendKVCacheMethod',
return_value=MagicMock()) as mock_ascend_kvcache:
# Test with kv_quant_type
modified_config = {"kv_quant_type": "C8"}
config = AscendQuantConfig(modified_config)
config.packed_modules_mapping = None
method = config.get_quant_method(attention_layer, "attn")
self.assertIs(method, mock_ascend_kvcache.return_value)
def test_get_quant_method_for_fused_moe(self):
fused_moe_layer = MagicMock(spec=FusedMoE)
fused_moe_layer.moe = MagicMock(spec=FusedMoEConfig)
# Test skipped layer
with patch.object(self.ascend_config, 'is_layer_skipped_ascend', return_value=True), \
patch('vllm_ascend.quantization.quant_config.AscendUnquantizedFusedMoEMethod', return_value=MagicMock()) as mock_ascend_moe:
method = self.ascend_config.get_quant_method(
fused_moe_layer, "moe_layer")
self.assertIs(method, mock_ascend_moe.return_value)
# Test quantized layer
with patch.object(self.ascend_config, 'is_layer_skipped_ascend', return_value=False), \
patch('vllm_ascend.quantization.quant_config.AscendFusedMoEMethod', return_value=MagicMock()) as mock_ascend_moe:
method = self.ascend_config.get_quant_method(
fused_moe_layer, "moe_layer")
self.assertIs(method, mock_ascend_moe.return_value)
def test_is_layer_skipped_ascend(self):
# Test non-fused layer that should be quantized
self.assertFalse(self.ascend_config.is_layer_skipped_ascend("layer1"))
# Test non-fused layer that should be skipped
self.assertTrue(self.ascend_config.is_layer_skipped_ascend("layer2"))
# Test fused layer
fused_mapping = {"fused_layer": ["shard1", "shard2"]}
self.assertTrue(
self.ascend_config.is_layer_skipped_ascend("fused_layer",
fused_mapping))
# Test inconsistent fused layer shards
bad_config = {"shard1.weight": "FLOAT", "shard2.weight": "INT8"}
config = AscendQuantConfig(bad_config)
with self.assertRaises(ValueError):
config.is_layer_skipped_ascend("fused_layer", fused_mapping)
def test_get_scaled_act_names(self):
self.assertEqual(self.ascend_config.get_scaled_act_names(), [])
class TestAscendKVCacheMethod(TestBase):
def setUp(self):
# Setup common test fixtures
self.mock_quant_config = MagicMock(spec=AscendQuantConfig)
self.mock_quant_config.quant_description = {"some_config": "value"}
self.prefix = "attention_layer"
# Mock the quantizer and quant_method
self.mock_quantizer = MagicMock()
self.mock_quant_method = MagicMock()
# Patch the AscendQuantizer
self.quantizer_patcher = patch(
'vllm_ascend.quantization.quant_config.AscendQuantizer.get_quantizer',
return_value=self.mock_quantizer)
self.mock_get_quantizer = self.quantizer_patcher.start()
self.mock_quantizer.build_attention_method.return_value = self.mock_quant_method
# Create instance
self.kv_cache_method = AscendKVCacheMethod(self.mock_quant_config,
self.prefix)
def tearDown(self):
self.quantizer_patcher.stop()
def test_init(self):
"""Test initialization with proper quantizer setup."""
self.mock_get_quantizer.assert_called_once_with(
self.mock_quant_config.quant_description, self.prefix)
self.mock_quantizer.build_attention_method.assert_called_once()
def test_create_weights(self):
"""Test create_weights delegates to quant_method."""
mock_layer = MagicMock()
self.kv_cache_method.create_weights(mock_layer)
self.mock_quant_method.create_weights.assert_called_once_with(
mock_layer)
def test_process_weights_after_loading_with_method(self):
"""Test process_weights when quant_method has the method."""
mock_layer = MagicMock()
self.kv_cache_method.process_weights_after_loading(mock_layer)
self.mock_quant_method.process_weights_after_loading.assert_called_once_with(
mock_layer)
def test_process_weights_after_loading_without_method(self):
"""Test process_weights when quant_method lacks the method."""
# Reset mock to remove the method
del self.mock_quant_method.process_weights_after_loading
mock_layer = MagicMock()
# Should not raise exception
self.kv_cache_method.process_weights_after_loading(mock_layer)
def test_apply_delegation(self):
"""Test apply properly delegates to quant_method."""
mock_layer = MagicMock()
mock_query = torch.randn(1, 32, 128)
mock_key = torch.randn(1, 32, 128)
mock_value = torch.randn(1, 32, 128)
mock_kv_cache = MagicMock()
mock_attn_metadata = MagicMock()
mock_scale = 1.0
mock_output = torch.zeros(1, 32, 128)
mock_attn_type = MagicMock()
expected_result = torch.randn(1, 32, 128)
self.mock_quant_method.apply.return_value = expected_result
result = self.kv_cache_method.apply(mock_layer, mock_query, mock_key,
mock_value, mock_kv_cache,
mock_attn_metadata, mock_attn_type,
mock_scale, mock_output)
self.mock_quant_method.apply.assert_called_once_with(
mock_layer, mock_query, mock_key, mock_value, mock_kv_cache,
mock_attn_metadata, mock_attn_type, mock_scale, mock_output)
self.assertTrue(torch.equal(result, expected_result))