Files
xc-llm-ascend/tests/ut/worker/test_model_runner_v1.py
lidenghui1110 a82b0fa70e mooncake connector support pipeline parallel & fix pp with flashcomm1 (#4054)
### What this PR does / why we need it?
To support pipeline parallel with PD disaggregation, this PR support PP
in mooncake connector and fix other bugs when enable pp with other
optimization params, including following changes:
- mooncake connector support pp in prefill, we do not support decode pp
currently
- fix bugs when enable both pp and flashcomm1
- optimize ascend-scheduler to support full batch in multiple pipeline
stages, original implementation would cause all pipeline stages
batch_size total summed to max_num_seq, which makes pipeline is not
full, this optimization can make all stages running with full batch_size
= max_num_seq, the same changes will contribute to vllm scheduler too.

### Does this PR introduce _any_ user-facing change?
add `pp_size` in mooncake connector kv_connector_extra_config
```
"kv_connector_extra_config": {
            "use_ascend_direct": true,
            "prefill": {
                    "dp_size": 1,
                    "tp_size": 4,
                    "pp_size": 4
             },
             "decode": {
                    "dp_size": 16,
                    "tp_size": 1
             }
        }
```

### How was this patch tested?

- vLLM version: v0.12.0
- vLLM main:
ad32e3e19c

---------

Signed-off-by: chenxiao <Jaychou1620@Gmail.com>
Signed-off-by: Kurumi5210 <Jaychou1620@Gmail.com>
Signed-off-by: Kurumi5210 <jaychou1620@gmail.com>
Signed-off-by: 秋刀鱼 <jaychou1620@Gmail.com>
Co-authored-by: chenxiao <Jaychou1620@Gmail.com>
Co-authored-by: zss <zss@qq.com>
Co-authored-by: zss <3265779424@qq.com>
2025-12-10 16:01:43 +08:00

114 lines
5.0 KiB
Python

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# This file is a part of the vllm-ascend project.
from unittest.mock import MagicMock, patch
import pytest
from vllm_ascend.ascend_forward_context import MoECommType
from vllm_ascend.utils import AscendDeviceType
from vllm_ascend.worker.model_runner_v1 import NPUModelRunner
# yapf: disable
@pytest.mark.parametrize(
"soc_version, enable_expert_parallel, world_size, pipeline_size, num_tokens, mc2_tokens_capacity, quant_type, expected_method",
[
# Case 1: Expert parallel is disabled, should always be 'allgather'
(AscendDeviceType._910B, False, 8, 2, 100, 256, None, MoECommType.ALLGATHER),
(AscendDeviceType._910_93, False, 16, 2, 500, 256, None, MoECommType.ALLGATHER),
# Case 2: A2 SOC with w4a8_dynamic -> use alltoall when not mc2
(AscendDeviceType._910B, True, 8, 1, 100, 256, "w4a8_dynamic", MoECommType.ALLTOALL),
(AscendDeviceType._910B, True, 16, 1, 257, 256, "w4a8_dynamic", MoECommType.ALLTOALL),
(AscendDeviceType._910B, True, 16, 1, 100, 256, "w4a8_dynamic", MoECommType.MC2), # meets mc2 condition
# Case 3: A2 SOC without w4a8_dynamic -> fallback to allgather
(AscendDeviceType._910B, True, 8, 2, 100, 256, None, MoECommType.ALLGATHER),
(AscendDeviceType._910B, True, 16, 2, 257, 256, None, MoECommType.ALLGATHER),
# Case 4: A3 SOC
(AscendDeviceType._910_93, True, 8, 2, 100, 256, None, MoECommType.MC2),
(AscendDeviceType._910_93, True, 8, 2, 257, 256, None, MoECommType.ALLTOALL),
])
# yapf: enable
def test_select_moe_comm_method(soc_version, enable_expert_parallel,
world_size, pipeline_size, num_tokens,
mc2_tokens_capacity, quant_type,
expected_method):
"""
Tests the _select_moe_comm_method with various configurations including quant_type.
"""
# Mock the NPUModelRunner instance and its dependencies
mock_runner = MagicMock(spec=NPUModelRunner)
mock_runner.parallel_config = MagicMock()
mock_runner.parallel_config.enable_expert_parallel = enable_expert_parallel
mock_runner.parallel_config.world_size_across_dp = world_size
mock_runner.parallel_config.pipeline_parallel_size = pipeline_size
mock_runner.mc2_tokens_capacity = mc2_tokens_capacity
# Add vllm_config.model_config.hf_config mock with moe_quantize
mock_hf_config = MagicMock()
mock_hf_config.moe_quantize = quant_type
mock_model_config = MagicMock()
mock_model_config.hf_config = mock_hf_config
mock_vllm_config = MagicMock()
mock_vllm_config.model_config = mock_model_config
mock_runner.vllm_config = mock_vllm_config
# Patch the helper functions
with patch('vllm_ascend.worker.model_runner_v1.get_ascend_device_type',
return_value=soc_version), \
patch('vllm_ascend.worker.model_runner_v1.is_global_first_rank',
return_value=True), \
patch('vllm_ascend.worker.model_runner_v1.is_moe_model',
return_value=True):
# Bind the real method to the mock object
method = NPUModelRunner._select_moe_comm_method(
mock_runner, num_tokens)
# Assert the result
assert method == expected_method
def test_select_moe_comm_method_unsupported_soc():
"""
Tests that _select_moe_comm_method raises ValueError for an unsupported SOC.
"""
mock_runner = MagicMock(spec=NPUModelRunner)
mock_runner.parallel_config = MagicMock()
mock_runner.parallel_config.enable_expert_parallel = True
mock_runner.mc2_tokens_capacity = 256
# Add vllm_config.model_config.hf_config mock with moe_quantize
mock_hf_config = MagicMock()
mock_hf_config.moe_quantize = None
mock_model_config = MagicMock()
mock_model_config.hf_config = mock_hf_config
mock_vllm_config = MagicMock()
mock_vllm_config.model_config = mock_model_config
mock_runner.vllm_config = mock_vllm_config
unsupported_soc = "UnsupportedSOC"
with patch('vllm_ascend.worker.model_runner_v1.get_ascend_device_type',
return_value=unsupported_soc), \
patch('vllm_ascend.worker.model_runner_v1.is_global_first_rank',
return_value=True), \
patch('vllm_ascend.worker.model_runner_v1.is_moe_model',
return_value=True), \
pytest.raises(ValueError, match=f"Unsupported soc_version: {unsupported_soc}"):
NPUModelRunner._select_moe_comm_method(mock_runner, 100)