From 302494c1febfb648ac2a5001390bbd9b59db6bc2 Mon Sep 17 00:00:00 2001 From: Clorist33 <117881833+Clorist33@users.noreply.github.com> Date: Wed, 24 Sep 2025 17:14:38 +0800 Subject: [PATCH] [EPLB] ut for EPLB (#3035) ## UT for EPLB Co-authored-by Skywalker-EP 173723846@qq.com Co-authored-by offline 0806@qq.com Co-authored-by dsxsteven@sina.com ## UT Description ### 1. Module Description - Module: EPLB ### 2. Covered Source Files - vllm_ascend/eplb/adaptor/abstract_adaptor.py - vllm_ascend/eplb/core/eplb_device_transfer_loader.py - vllm_ascend/eplb/core/eplb_utils.py - vllm_ascend/eplb/core/policy/policy_abstract.py - vllm_ascend/eplb/core/policy/policy_dynamic_ep.py - vllm_ascend/eplb/core/policy/policy_dynamic_ep_v2.py - vllm_ascend/eplb/core/policy/policy_factory.py ### 3. Testing Method - Framework: pytest - Test Data: mock data - Test Type: unit test ### 4. Coverage - Statement Coverage: 90% - vLLM version: v0.10.2 - vLLM main: https://github.com/vllm-project/vllm/commit/f225ea7dd98e9f29752e5c032cd4a8ee1d712f16 --------- Signed-off-by: tanqingshan (A) <50050625@china.huawei.com> Signed-off-by: tanqingshan <50050625@china.huawei.com> Signed-off-by: daishixun Co-authored-by: tanqingshan (A) Co-authored-by: tanqingshan <50050625@china.huawei.com> Co-authored-by: daishixun Co-authored-by: dsxsteven <36877507+dsxsteven@users.noreply.github.com> --- .../ut/eplb/adaptor/test_abstract_adaptor.py | 73 +++++++++++ .../eplb/core/policy/test_policy_abstract.py | 31 +++++ .../core/policy/test_policy_dynamic_ep.py | 98 ++++++++++++++ .../core/policy/test_policy_dynamic_ep_v2.py | 99 ++++++++++++++ .../ut/eplb/core/policy/test_policy_factor.py | 23 ++++ .../core/test_eplb_device_transfer_loader.py | 122 ++++++++++++++++++ tests/ut/eplb/core/test_eplb_utils.py | 79 ++++++++++++ 7 files changed, 525 insertions(+) create mode 100644 tests/ut/eplb/adaptor/test_abstract_adaptor.py create mode 100644 tests/ut/eplb/core/policy/test_policy_abstract.py create mode 100644 tests/ut/eplb/core/policy/test_policy_dynamic_ep.py create mode 100644 tests/ut/eplb/core/policy/test_policy_dynamic_ep_v2.py create mode 100644 tests/ut/eplb/core/policy/test_policy_factor.py create mode 100644 tests/ut/eplb/core/test_eplb_device_transfer_loader.py create mode 100644 tests/ut/eplb/core/test_eplb_utils.py diff --git a/tests/ut/eplb/adaptor/test_abstract_adaptor.py b/tests/ut/eplb/adaptor/test_abstract_adaptor.py new file mode 100644 index 0000000..a3d93ca --- /dev/null +++ b/tests/ut/eplb/adaptor/test_abstract_adaptor.py @@ -0,0 +1,73 @@ +import pytest + +from vllm_ascend.eplb.adaptor.abstract_adaptor import EplbAdaptor + + +class DummyAdaptor(EplbAdaptor): + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.args = kwargs + + def get_rank_expert_workload(self): + return "workload" + + def get_init_expert_map(self, num_moe_layers): + return {"layers": num_moe_layers} + + def do_update_expert_map(self, layer_id, updated_expert_map): + return {"layer_id": layer_id, "map": updated_expert_map} + + def do_update_expert_weight(self, layer_id, local_expert_to_replace, + buffer_tensor_id): + return { + "layer_id": layer_id, + "replace": local_expert_to_replace, + "buffer": buffer_tensor_id, + } + + +def test_base_class_methods_raise(): + adaptor = EplbAdaptor() + with pytest.raises(NotImplementedError): + adaptor.get_rank_expert_workload() + with pytest.raises(NotImplementedError): + adaptor.get_init_expert_map(1) + with pytest.raises(NotImplementedError): + adaptor.do_update_expert_map(1, {}) + with pytest.raises(NotImplementedError): + adaptor.do_update_expert_weight(1, "x", "y") + + +def test_dummy_adaptor_init_and_args(): + adaptor = DummyAdaptor(test_arg=123) + assert adaptor.args["test_arg"] == 123 + + +def test_get_rank_expert_workload(): + adaptor = DummyAdaptor() + result = adaptor.get_rank_expert_workload() + assert result == "workload" + + +def test_get_init_expert_map(): + adaptor = DummyAdaptor() + result = adaptor.get_init_expert_map(5) + assert isinstance(result, dict) + assert result["layers"] == 5 + + +def test_do_update_expert_map(): + adaptor = DummyAdaptor() + updated = {"expert": 1} + result = adaptor.do_update_expert_map(2, updated) + assert result["layer_id"] == 2 + assert result["map"] == updated + + +def test_do_update_expert_weight(): + adaptor = DummyAdaptor() + result = adaptor.do_update_expert_weight(1, "expertA", "bufferX") + assert result["layer_id"] == 1 + assert result["replace"] == "expertA" + assert result["buffer"] == "bufferX" diff --git a/tests/ut/eplb/core/policy/test_policy_abstract.py b/tests/ut/eplb/core/policy/test_policy_abstract.py new file mode 100644 index 0000000..26eb28b --- /dev/null +++ b/tests/ut/eplb/core/policy/test_policy_abstract.py @@ -0,0 +1,31 @@ +# test_policy_abstract.py +from vllm_ascend.eplb.core.policy.policy_abstract import (DynamicConfig, + EplbPolicy) + + +class DummyPolicy(EplbPolicy): + + def rebalance_experts(self, current_expert_table, expert_workload): + return 1, current_expert_table + + +def test_dynamic_config_attributes(): + config = DynamicConfig() + assert config.placement_policy is None + assert config.max_transferred_expert_per_layer == 100 + assert config.ep_worldsize == 64 + assert config.num_die_per_host == 8 + + +def test_eplb_policy_init_and_method(): + config = DynamicConfig() + policy = DummyPolicy(config) + + assert policy.config == config + + expert_table = [[0, 1, 2]] + workload = [10] + res, new_table = policy.rebalance_experts(expert_table, workload) + + assert res == 1 + assert new_table == expert_table diff --git a/tests/ut/eplb/core/policy/test_policy_dynamic_ep.py b/tests/ut/eplb/core/policy/test_policy_dynamic_ep.py new file mode 100644 index 0000000..f432d9b --- /dev/null +++ b/tests/ut/eplb/core/policy/test_policy_dynamic_ep.py @@ -0,0 +1,98 @@ +from unittest.mock import patch + +import numpy as np +import pytest + +from vllm_ascend.eplb.core.policy.policy_dynamic_ep import DynamicEplb + + +class TestDynamicEplb: + + def test_add_redundant_basic(self): + current_expert_table = np.array([[[0, 1], [1, 0]]]) + expert_workload = np.array([[[2, 3], [4, 1]]]) + num_original_expert = 2 + result = DynamicEplb.add_redundant(current_expert_table, + expert_workload, + num_original_expert) + expected = np.array([[2 + 1, 3 + 4]]) + assert np.array_equal(result, expected) + + def test_get_redundant_num(self): + counts = np.array([2, 1, 3]) + assert DynamicEplb.get_redundant_num(3, counts) == 3 + + def test_calculate_max_heat_per_layer(self): + workload_table = np.array([[[1, 2], [3, 4]], [[2, 2], [1, 1]]]) + max_heat = DynamicEplb.calculate_max_heat_per_layer(workload_table, 2) + assert max_heat == [7, 4] + + def test_constraint_expert_local_exchange(self): + current = [[[0, 1], [2, 3]]] + global_dep = [[[1, 0], [3, 2]]] + new_dep = DynamicEplb.constraint_expert_local_exchange( + current, global_dep) + assert new_dep == [[[0, 1], [2, 3]]] + + def test_compute_balanced_pack_redundancy_normal(self): + origin_weights = [(0, 10), (1, 20)] + result, boxes = DynamicEplb.compute_balanced_pack_redundancy( + origin_weights, 2, 1) + assert isinstance(result, list) and len(result) == 2 + + def test_compute_balanced_pack_redundancy_card0(self): + origin_weights = [(0, 10)] + with pytest.raises(RuntimeError): + DynamicEplb.compute_balanced_pack_redundancy(origin_weights, 0, 0) + + def test_compute_balanced_pack_normal(self): + origin_weights = np.array([(0, 10), (1, 20)], dtype=object) + result, boxes = DynamicEplb.compute_balanced_pack(origin_weights, 2) + assert isinstance(result, list) and len(result) == 2 + + def test_compute_balanced_pack_card0(self): + origin_weights = np.array([(0, 10)], dtype=object) + with pytest.raises(RuntimeError): + DynamicEplb.compute_balanced_pack(origin_weights, 0) + + def test_original_compute_balanced_pack_redundancy(self): + origin_weights = [(0, 5), (1, 10)] + result, boxes = DynamicEplb.original_compute_balanced_pack_redundancy( + origin_weights, 2, 1) + assert isinstance(result, list) and len(result) == 2 + + def test_rebalance_experts_normal(self): + expert_table = np.array([[[0, 1], [1, 0]]]) + workload = np.array([[[2, 3], [4, 1]]]) + policy = DynamicEplb(config=None) + change, priority, new_dep = policy.rebalance_experts( + expert_table, workload) + assert change in [0, 1] + assert isinstance(priority, np.ndarray) + assert isinstance(new_dep, list) + assert np.array(new_dep).shape == expert_table.shape + + def test_rebalance_experts_exceptions(self): + policy = DynamicEplb(config=None) + + # case1: num_original_expert != expert_num + expert_table = np.array([[[0, 1], [1, 0]]]) + workload = np.array([[[2, 3], [4, 1]]]) + with patch.object(DynamicEplb, + 'add_redundant', + return_value=np.array([[1, 2, 3]])): + with pytest.raises(ValueError): + policy.rebalance_experts(expert_table, workload) + + # case2: num_npus <= 0 + expert_table_zero = np.array([[]]) # 1 layer, 0 NPU, 0 experts + workload_zero = np.array([[]]) + with pytest.raises(ValueError): + policy.rebalance_experts(expert_table_zero, workload_zero) + + # case3: num_npus < num_redundancy_expert + expert_table_small = np.array([[[0, 0]]]) # 1 layer, 1 NPU, 2 experts + workload_small = np.array([[[1, 1]]]) + with patch.object(DynamicEplb, 'get_redundant_num', return_value=2): + with pytest.raises(ValueError): + policy.rebalance_experts(expert_table_small, workload_small) diff --git a/tests/ut/eplb/core/policy/test_policy_dynamic_ep_v2.py b/tests/ut/eplb/core/policy/test_policy_dynamic_ep_v2.py new file mode 100644 index 0000000..eddd18c --- /dev/null +++ b/tests/ut/eplb/core/policy/test_policy_dynamic_ep_v2.py @@ -0,0 +1,99 @@ +from typing import Dict, Set + +import numpy as np +import pytest + +from vllm_ascend.eplb.core.policy.policy_dynamic_ep_v2 import (DynamicConfig, + DynamicEplbV2) + + +@pytest.fixture +def config(): + return DynamicConfig() + + +@pytest.fixture +def policy(config): + return DynamicEplbV2(config) + + +def test_safe_operations(policy): + # safe_divide + assert policy.safe_divide(10, 2) == 5 + assert policy.safe_divide(1, 0) == 0 + + # safe_exact_divide + assert policy.safe_exact_divide(10, 3) == 3 + assert policy.safe_exact_divide(1, 0) == 0 + + # safe_mod + assert policy.safe_mod(10, 3) == 1 + assert policy.safe_mod(1, 0) == 0 + + +def test_add_redundant(): + workload = np.array([[[1, 2], [3, 4]]]) + placement = np.array([[[0, 1], [0, 1]]]) + result = DynamicEplbV2.add_redundant(placement, workload, 2) + assert result.shape == (1, 2) + assert np.all(result[0] == [4, 6]) # 0:1+3, 1:2+4 + + +def test_get_redundant_num(): + counts = np.array([1, 2, 1]) + assert DynamicEplbV2.get_redundant_num(3, counts) == 1 # sum(counts-1) + + +def test_calculate_max_heat_per_layer(): + workload = np.array([[[1, 2], [3, 4]], [[5, 6], [7, 8]]]) + result = DynamicEplbV2.calculate_max_heat_per_layer(workload, 2) + assert result == [7, 15] + + +def test_calculate_initial_imbalance(policy): + deployment = np.array([[[0, 1], [0, 1]]]) + workloads = np.array([[1, 1]]) + result = policy.calculate_initial_imbalance(deployment, workloads) + assert isinstance(result, list) + assert len(result) == 1 + + +def test_compute_redundant_assignments(policy): + base_experts = [(0, 10), (1, 5)] + redundant, sorted_weights = policy.compute_redundant_assignments( + base_experts, num_redundant_experts=2, num_experts=2) + assert len(redundant) == 2 + assert len(sorted_weights) == 2 + + +def test_prepare_expert_list(): + base_experts = [(0, 10), (1, 5)] + redundant_assignments = [[2], []] + result = DynamicEplbV2.prepare_expert_list(base_experts, + redundant_assignments, 1) + assert isinstance(result, list) + assert len(result) == 1 + + +def test_non_redundant_expert_information(): + origin_deployment = np.array([[0, 1]]) + updated_weights = [(0, 10), (1, 5)] + rendun_pos: Dict[int, Set[int]] = {0: set()} + assignments, weights, loads, counts = DynamicEplbV2.non_redundant_expert_information( + origin_deployment, updated_weights, rendun_pos) + assert assignments[0] == [0, 1] + assert loads[0] == 15 + + +def test_recomputing_initial_weight(policy): + layer_workloads = [10, 5] + device_assignments = [[0, 1]] + cur_layer_workload, num_all_experts = policy.recomputing_initial_weight( + layer_workloads, device_assignments) + assert cur_layer_workload[0] == 10 + assert num_all_experts[0] == 1 + + +def test_safe_divide_zero_edge_case(policy): + assert policy.safe_divide(0, 1) == 0 + assert policy.safe_divide(0, 5) == 0 diff --git a/tests/ut/eplb/core/policy/test_policy_factor.py b/tests/ut/eplb/core/policy/test_policy_factor.py new file mode 100644 index 0000000..7894335 --- /dev/null +++ b/tests/ut/eplb/core/policy/test_policy_factor.py @@ -0,0 +1,23 @@ +import pytest + +from vllm_ascend.eplb.core.policy.policy_abstract import DynamicConfig +from vllm_ascend.eplb.core.policy.policy_dynamic_ep import DynamicEplb +from vllm_ascend.eplb.core.policy.policy_dynamic_ep_v2 import DynamicEplbV2 +from vllm_ascend.eplb.core.policy.policy_factory import PolicyFactory +from vllm_ascend.eplb.core.policy.policy_random import RandomLoadBalance + + +@pytest.fixture +def dummy_config(): + return DynamicConfig() + + +@pytest.mark.parametrize("policy_type, expected_class", [ + (0, RandomLoadBalance), + (1, DynamicEplb), + (2, DynamicEplbV2), + (999, RandomLoadBalance), +]) +def test_generate_policy(policy_type, expected_class, dummy_config): + policy_instance = PolicyFactory.generate_policy(policy_type, dummy_config) + assert isinstance(policy_instance, expected_class) diff --git a/tests/ut/eplb/core/test_eplb_device_transfer_loader.py b/tests/ut/eplb/core/test_eplb_device_transfer_loader.py new file mode 100644 index 0000000..8835ff5 --- /dev/null +++ b/tests/ut/eplb/core/test_eplb_device_transfer_loader.py @@ -0,0 +1,122 @@ +from typing import Any +from unittest.mock import MagicMock, patch + +import pytest +import torch + +import vllm_ascend.eplb.core.eplb_device_transfer_loader as loader + + +@pytest.fixture +def mock_adaptor(): + adaptor = MagicMock() + + adaptor.expert_map_per_layer_cpu = { + 0: { + 10: torch.tensor(1), + 20: torch.tensor(0) + } + } + + adaptor.expert_param_per_layer = { + 0: { + 0: [[torch.tensor([1.0])]], + 1: [[torch.tensor([2.0])]] + } + } + + adaptor.buffer_tensor_list = [[[torch.tensor([3.0])], + [torch.tensor([4.0])]]] + return adaptor + + +def test_generate_task_and_state_flow(mock_adaptor): + loader_obj = loader.D2DExpertWeightLoader() + loader_obj.set_adator(mock_adaptor) + + with patch("torch.distributed.P2POp") as mock_p2p, \ + patch("torch.distributed.isend", return_value="isend_op"), \ + patch("torch.distributed.irecv", return_value="irecv_op"): + + mock_p2p.side_effect = lambda op, tensor, rank: (op, tensor, rank) + + loader_obj.state = loader.ExpertWeightUpdateState.READY + loader_obj.generate_expert_d2d_transfer_task([(1, 10)], [(2, 20)], + {20: torch.tensor(0)}, 0) + assert loader_obj.comm_op_list is None + loader_obj.state = loader.ExpertWeightUpdateState.WAITING + + loader_obj.generate_expert_d2d_transfer_task([], [], {}, 0) + assert loader_obj.comm_op_list is None + + updated_map = {20: torch.tensor(0)} + loader_obj.generate_expert_d2d_transfer_task([(1, 10)], [(2, 20)], + updated_map, 0) + assert loader_obj.state == loader.ExpertWeightUpdateState.READY + assert loader_obj.comm_op_list + assert loader_obj.recv_expert_list + + +def test_asyn_transfer_and_update(mock_adaptor): + loader_obj = loader.D2DExpertWeightLoader() + loader_obj.set_adator(mock_adaptor) + + loader_obj.comm_op_list = ["fake_op"] + loader_obj.state = loader.ExpertWeightUpdateState.READY + + reqs: list[MagicMock] = [] + + with patch("torch.distributed.batch_isend_irecv", + return_value=[MagicMock(), MagicMock()]): + loader_obj.asyn_expert_weight_transfer(reqs) + + assert loader_obj.state == loader.ExpertWeightUpdateState.TRANSFERRING + assert len(reqs) > 0 + + mock_req = MagicMock() + mock_req.wait.return_value = None + reqs = [mock_req] + + loader_obj.recv_expert_list = [(0, 0)] + loader_obj.updated_expert_map = {20: torch.tensor(0)} + loader_obj.updated_log2phy_map = {"dummy": 1} + loader_obj.layer_id = 0 + loader_obj.comm_op_list = ["op"] + + loader_obj.update_expert_map_and_weight(reqs) + + mock_adaptor.do_update_expert_map.assert_called_once() + mock_adaptor.do_update_log2phy_map.assert_called_once() + mock_adaptor.do_update_expert_weight.assert_called_once() + + assert loader_obj.state == loader.ExpertWeightUpdateState.WAITING + assert loader_obj.recv_expert_list == [] + + +def test_set_log2phy_map(mock_adaptor): + loader_obj = loader.D2DExpertWeightLoader() + loader_obj.set_adator(mock_adaptor) + loader_obj.set_log2phy_map({"a": 1}) + assert loader_obj.updated_log2phy_map == {"a": 1} + + +def test_invalid_state_asyn_update(mock_adaptor): + loader_obj = loader.D2DExpertWeightLoader() + loader_obj.set_adator(mock_adaptor) + + loader_obj.state = loader.ExpertWeightUpdateState.WAITING + reqs: list[Any] = [] + loader_obj.asyn_expert_weight_transfer(reqs) + assert reqs == [] + + loader_obj.state = loader.ExpertWeightUpdateState.READY + loader_obj.update_expert_map_and_weight([]) + + assert not mock_adaptor.do_update_expert_map.called + + +def test_load_impl_not_implemented(mock_adaptor): + loader_obj = loader.D2DExpertWeightLoader() + loader_obj.set_adator(mock_adaptor) + with pytest.raises(NotImplementedError): + loader_obj.load_impl({}, {}) diff --git a/tests/ut/eplb/core/test_eplb_utils.py b/tests/ut/eplb/core/test_eplb_utils.py new file mode 100644 index 0000000..8a9761f --- /dev/null +++ b/tests/ut/eplb/core/test_eplb_utils.py @@ -0,0 +1,79 @@ +import random + +import torch + +from vllm_ascend.eplb.core import eplb_utils + + +def test_determine_default_expert_map_single_world(): + count, expert_map = eplb_utils.determine_default_expert_map( + global_expert_num=4, + world_size=1, + rank_id=0, + global_redundant_expert_num=0) + assert count == 4 + assert torch.equal(expert_map, torch.arange(4, dtype=torch.int32)) + + +def test_determine_default_expert_map_multiple_worlds_no_redundant(): + count, expert_map = eplb_utils.determine_default_expert_map( + global_expert_num=8, + world_size=2, + rank_id=0, + global_redundant_expert_num=0) + + assert count == 4 + assert torch.all(expert_map[:4] >= 0) + assert torch.all(expert_map[4:] == -1) + + +def test_determine_default_expert_map_multiple_worlds_with_redundant(): + count, expert_map = eplb_utils.determine_default_expert_map( + global_expert_num=5, + world_size=2, + rank_id=0, + global_redundant_expert_num=1) + + assert count == 3 + assert torch.all(expert_map[0:3] >= 0) + + +def test_generate_log2phy_map_single_rank_holding(): + + expert_map = torch.tensor([[0, -1], [-1, 0]], dtype=torch.int32) + log2phy_map = eplb_utils.generate_log2phy_map(expert_map) + + assert torch.all(log2phy_map[:, 0] == log2phy_map[0, 0]) + assert torch.all(log2phy_map[:, 1] == log2phy_map[1, 1]) + + +def test_generate_log2phy_map_multiple_rank_holding(monkeypatch): + + expert_map = torch.tensor([[0], [0]], dtype=torch.int32) + + monkeypatch.setattr(random, "choice", lambda x: x[0]) + + log2phy_map = eplb_utils.generate_log2phy_map(expert_map) + + assert log2phy_map.shape == (2, 1) + assert (log2phy_map >= 0).all() + + +def test_determine_default_log2phy_map_world_size_1(): + log2phy = eplb_utils.determine_default_log2phy_map( + global_expert_num=3, + world_size=1, + rank_id=0, + global_redundant_expert_num=0) + assert log2phy.shape == (3, ) + assert (log2phy >= 0).all() + + +def test_determine_default_log2phy_map_world_size_multiple(): + log2phy = eplb_utils.determine_default_log2phy_map( + global_expert_num=6, + world_size=2, + rank_id=1, + global_redundant_expert_num=1) + assert log2phy.shape == (6, ) + assert (log2phy >= 0).all()