[EPLB] ut for EPLB (#3035)

## UT for EPLB

Co-authored-by Skywalker-EP 173723846@qq.com
Co-authored-by offline 0806@qq.com
Co-authored-by dsxsteven@sina.com

## UT Description

### 1. Module Description
- Module: EPLB

### 2. Covered Source Files
- vllm_ascend/eplb/adaptor/abstract_adaptor.py
- vllm_ascend/eplb/core/eplb_device_transfer_loader.py
- vllm_ascend/eplb/core/eplb_utils.py
- vllm_ascend/eplb/core/policy/policy_abstract.py
- vllm_ascend/eplb/core/policy/policy_dynamic_ep.py
- vllm_ascend/eplb/core/policy/policy_dynamic_ep_v2.py
- vllm_ascend/eplb/core/policy/policy_factory.py

### 3. Testing Method
- Framework: pytest
- Test Data: mock data
- Test Type: unit test

### 4. Coverage
- Statement Coverage: 90%


- vLLM version: v0.10.2
- vLLM main:
f225ea7dd9

---------

Signed-off-by: tanqingshan (A)  <50050625@china.huawei.com>
Signed-off-by: tanqingshan <50050625@china.huawei.com>
Signed-off-by: daishixun <dsxsteven@sina.com>
Co-authored-by: tanqingshan (A) <t50050625@china.huawei.com>
Co-authored-by: tanqingshan <50050625@china.huawei.com>
Co-authored-by: daishixun <dsxsteven@sina.com>
Co-authored-by: dsxsteven <36877507+dsxsteven@users.noreply.github.com>
This commit is contained in:
Clorist33
2025-09-24 17:14:38 +08:00
committed by GitHub
parent 80524f5711
commit 302494c1fe
7 changed files with 525 additions and 0 deletions

View File

@@ -0,0 +1,73 @@
import pytest
from vllm_ascend.eplb.adaptor.abstract_adaptor import EplbAdaptor
class DummyAdaptor(EplbAdaptor):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.args = kwargs
def get_rank_expert_workload(self):
return "workload"
def get_init_expert_map(self, num_moe_layers):
return {"layers": num_moe_layers}
def do_update_expert_map(self, layer_id, updated_expert_map):
return {"layer_id": layer_id, "map": updated_expert_map}
def do_update_expert_weight(self, layer_id, local_expert_to_replace,
buffer_tensor_id):
return {
"layer_id": layer_id,
"replace": local_expert_to_replace,
"buffer": buffer_tensor_id,
}
def test_base_class_methods_raise():
adaptor = EplbAdaptor()
with pytest.raises(NotImplementedError):
adaptor.get_rank_expert_workload()
with pytest.raises(NotImplementedError):
adaptor.get_init_expert_map(1)
with pytest.raises(NotImplementedError):
adaptor.do_update_expert_map(1, {})
with pytest.raises(NotImplementedError):
adaptor.do_update_expert_weight(1, "x", "y")
def test_dummy_adaptor_init_and_args():
adaptor = DummyAdaptor(test_arg=123)
assert adaptor.args["test_arg"] == 123
def test_get_rank_expert_workload():
adaptor = DummyAdaptor()
result = adaptor.get_rank_expert_workload()
assert result == "workload"
def test_get_init_expert_map():
adaptor = DummyAdaptor()
result = adaptor.get_init_expert_map(5)
assert isinstance(result, dict)
assert result["layers"] == 5
def test_do_update_expert_map():
adaptor = DummyAdaptor()
updated = {"expert": 1}
result = adaptor.do_update_expert_map(2, updated)
assert result["layer_id"] == 2
assert result["map"] == updated
def test_do_update_expert_weight():
adaptor = DummyAdaptor()
result = adaptor.do_update_expert_weight(1, "expertA", "bufferX")
assert result["layer_id"] == 1
assert result["replace"] == "expertA"
assert result["buffer"] == "bufferX"

View File

@@ -0,0 +1,31 @@
# test_policy_abstract.py
from vllm_ascend.eplb.core.policy.policy_abstract import (DynamicConfig,
EplbPolicy)
class DummyPolicy(EplbPolicy):
def rebalance_experts(self, current_expert_table, expert_workload):
return 1, current_expert_table
def test_dynamic_config_attributes():
config = DynamicConfig()
assert config.placement_policy is None
assert config.max_transferred_expert_per_layer == 100
assert config.ep_worldsize == 64
assert config.num_die_per_host == 8
def test_eplb_policy_init_and_method():
config = DynamicConfig()
policy = DummyPolicy(config)
assert policy.config == config
expert_table = [[0, 1, 2]]
workload = [10]
res, new_table = policy.rebalance_experts(expert_table, workload)
assert res == 1
assert new_table == expert_table

View File

@@ -0,0 +1,98 @@
from unittest.mock import patch
import numpy as np
import pytest
from vllm_ascend.eplb.core.policy.policy_dynamic_ep import DynamicEplb
class TestDynamicEplb:
def test_add_redundant_basic(self):
current_expert_table = np.array([[[0, 1], [1, 0]]])
expert_workload = np.array([[[2, 3], [4, 1]]])
num_original_expert = 2
result = DynamicEplb.add_redundant(current_expert_table,
expert_workload,
num_original_expert)
expected = np.array([[2 + 1, 3 + 4]])
assert np.array_equal(result, expected)
def test_get_redundant_num(self):
counts = np.array([2, 1, 3])
assert DynamicEplb.get_redundant_num(3, counts) == 3
def test_calculate_max_heat_per_layer(self):
workload_table = np.array([[[1, 2], [3, 4]], [[2, 2], [1, 1]]])
max_heat = DynamicEplb.calculate_max_heat_per_layer(workload_table, 2)
assert max_heat == [7, 4]
def test_constraint_expert_local_exchange(self):
current = [[[0, 1], [2, 3]]]
global_dep = [[[1, 0], [3, 2]]]
new_dep = DynamicEplb.constraint_expert_local_exchange(
current, global_dep)
assert new_dep == [[[0, 1], [2, 3]]]
def test_compute_balanced_pack_redundancy_normal(self):
origin_weights = [(0, 10), (1, 20)]
result, boxes = DynamicEplb.compute_balanced_pack_redundancy(
origin_weights, 2, 1)
assert isinstance(result, list) and len(result) == 2
def test_compute_balanced_pack_redundancy_card0(self):
origin_weights = [(0, 10)]
with pytest.raises(RuntimeError):
DynamicEplb.compute_balanced_pack_redundancy(origin_weights, 0, 0)
def test_compute_balanced_pack_normal(self):
origin_weights = np.array([(0, 10), (1, 20)], dtype=object)
result, boxes = DynamicEplb.compute_balanced_pack(origin_weights, 2)
assert isinstance(result, list) and len(result) == 2
def test_compute_balanced_pack_card0(self):
origin_weights = np.array([(0, 10)], dtype=object)
with pytest.raises(RuntimeError):
DynamicEplb.compute_balanced_pack(origin_weights, 0)
def test_original_compute_balanced_pack_redundancy(self):
origin_weights = [(0, 5), (1, 10)]
result, boxes = DynamicEplb.original_compute_balanced_pack_redundancy(
origin_weights, 2, 1)
assert isinstance(result, list) and len(result) == 2
def test_rebalance_experts_normal(self):
expert_table = np.array([[[0, 1], [1, 0]]])
workload = np.array([[[2, 3], [4, 1]]])
policy = DynamicEplb(config=None)
change, priority, new_dep = policy.rebalance_experts(
expert_table, workload)
assert change in [0, 1]
assert isinstance(priority, np.ndarray)
assert isinstance(new_dep, list)
assert np.array(new_dep).shape == expert_table.shape
def test_rebalance_experts_exceptions(self):
policy = DynamicEplb(config=None)
# case1: num_original_expert != expert_num
expert_table = np.array([[[0, 1], [1, 0]]])
workload = np.array([[[2, 3], [4, 1]]])
with patch.object(DynamicEplb,
'add_redundant',
return_value=np.array([[1, 2, 3]])):
with pytest.raises(ValueError):
policy.rebalance_experts(expert_table, workload)
# case2: num_npus <= 0
expert_table_zero = np.array([[]]) # 1 layer, 0 NPU, 0 experts
workload_zero = np.array([[]])
with pytest.raises(ValueError):
policy.rebalance_experts(expert_table_zero, workload_zero)
# case3: num_npus < num_redundancy_expert
expert_table_small = np.array([[[0, 0]]]) # 1 layer, 1 NPU, 2 experts
workload_small = np.array([[[1, 1]]])
with patch.object(DynamicEplb, 'get_redundant_num', return_value=2):
with pytest.raises(ValueError):
policy.rebalance_experts(expert_table_small, workload_small)

View File

@@ -0,0 +1,99 @@
from typing import Dict, Set
import numpy as np
import pytest
from vllm_ascend.eplb.core.policy.policy_dynamic_ep_v2 import (DynamicConfig,
DynamicEplbV2)
@pytest.fixture
def config():
return DynamicConfig()
@pytest.fixture
def policy(config):
return DynamicEplbV2(config)
def test_safe_operations(policy):
# safe_divide
assert policy.safe_divide(10, 2) == 5
assert policy.safe_divide(1, 0) == 0
# safe_exact_divide
assert policy.safe_exact_divide(10, 3) == 3
assert policy.safe_exact_divide(1, 0) == 0
# safe_mod
assert policy.safe_mod(10, 3) == 1
assert policy.safe_mod(1, 0) == 0
def test_add_redundant():
workload = np.array([[[1, 2], [3, 4]]])
placement = np.array([[[0, 1], [0, 1]]])
result = DynamicEplbV2.add_redundant(placement, workload, 2)
assert result.shape == (1, 2)
assert np.all(result[0] == [4, 6]) # 0:1+3, 1:2+4
def test_get_redundant_num():
counts = np.array([1, 2, 1])
assert DynamicEplbV2.get_redundant_num(3, counts) == 1 # sum(counts-1)
def test_calculate_max_heat_per_layer():
workload = np.array([[[1, 2], [3, 4]], [[5, 6], [7, 8]]])
result = DynamicEplbV2.calculate_max_heat_per_layer(workload, 2)
assert result == [7, 15]
def test_calculate_initial_imbalance(policy):
deployment = np.array([[[0, 1], [0, 1]]])
workloads = np.array([[1, 1]])
result = policy.calculate_initial_imbalance(deployment, workloads)
assert isinstance(result, list)
assert len(result) == 1
def test_compute_redundant_assignments(policy):
base_experts = [(0, 10), (1, 5)]
redundant, sorted_weights = policy.compute_redundant_assignments(
base_experts, num_redundant_experts=2, num_experts=2)
assert len(redundant) == 2
assert len(sorted_weights) == 2
def test_prepare_expert_list():
base_experts = [(0, 10), (1, 5)]
redundant_assignments = [[2], []]
result = DynamicEplbV2.prepare_expert_list(base_experts,
redundant_assignments, 1)
assert isinstance(result, list)
assert len(result) == 1
def test_non_redundant_expert_information():
origin_deployment = np.array([[0, 1]])
updated_weights = [(0, 10), (1, 5)]
rendun_pos: Dict[int, Set[int]] = {0: set()}
assignments, weights, loads, counts = DynamicEplbV2.non_redundant_expert_information(
origin_deployment, updated_weights, rendun_pos)
assert assignments[0] == [0, 1]
assert loads[0] == 15
def test_recomputing_initial_weight(policy):
layer_workloads = [10, 5]
device_assignments = [[0, 1]]
cur_layer_workload, num_all_experts = policy.recomputing_initial_weight(
layer_workloads, device_assignments)
assert cur_layer_workload[0] == 10
assert num_all_experts[0] == 1
def test_safe_divide_zero_edge_case(policy):
assert policy.safe_divide(0, 1) == 0
assert policy.safe_divide(0, 5) == 0

View File

@@ -0,0 +1,23 @@
import pytest
from vllm_ascend.eplb.core.policy.policy_abstract import DynamicConfig
from vllm_ascend.eplb.core.policy.policy_dynamic_ep import DynamicEplb
from vllm_ascend.eplb.core.policy.policy_dynamic_ep_v2 import DynamicEplbV2
from vllm_ascend.eplb.core.policy.policy_factory import PolicyFactory
from vllm_ascend.eplb.core.policy.policy_random import RandomLoadBalance
@pytest.fixture
def dummy_config():
return DynamicConfig()
@pytest.mark.parametrize("policy_type, expected_class", [
(0, RandomLoadBalance),
(1, DynamicEplb),
(2, DynamicEplbV2),
(999, RandomLoadBalance),
])
def test_generate_policy(policy_type, expected_class, dummy_config):
policy_instance = PolicyFactory.generate_policy(policy_type, dummy_config)
assert isinstance(policy_instance, expected_class)

View File

@@ -0,0 +1,122 @@
from typing import Any
from unittest.mock import MagicMock, patch
import pytest
import torch
import vllm_ascend.eplb.core.eplb_device_transfer_loader as loader
@pytest.fixture
def mock_adaptor():
adaptor = MagicMock()
adaptor.expert_map_per_layer_cpu = {
0: {
10: torch.tensor(1),
20: torch.tensor(0)
}
}
adaptor.expert_param_per_layer = {
0: {
0: [[torch.tensor([1.0])]],
1: [[torch.tensor([2.0])]]
}
}
adaptor.buffer_tensor_list = [[[torch.tensor([3.0])],
[torch.tensor([4.0])]]]
return adaptor
def test_generate_task_and_state_flow(mock_adaptor):
loader_obj = loader.D2DExpertWeightLoader()
loader_obj.set_adator(mock_adaptor)
with patch("torch.distributed.P2POp") as mock_p2p, \
patch("torch.distributed.isend", return_value="isend_op"), \
patch("torch.distributed.irecv", return_value="irecv_op"):
mock_p2p.side_effect = lambda op, tensor, rank: (op, tensor, rank)
loader_obj.state = loader.ExpertWeightUpdateState.READY
loader_obj.generate_expert_d2d_transfer_task([(1, 10)], [(2, 20)],
{20: torch.tensor(0)}, 0)
assert loader_obj.comm_op_list is None
loader_obj.state = loader.ExpertWeightUpdateState.WAITING
loader_obj.generate_expert_d2d_transfer_task([], [], {}, 0)
assert loader_obj.comm_op_list is None
updated_map = {20: torch.tensor(0)}
loader_obj.generate_expert_d2d_transfer_task([(1, 10)], [(2, 20)],
updated_map, 0)
assert loader_obj.state == loader.ExpertWeightUpdateState.READY
assert loader_obj.comm_op_list
assert loader_obj.recv_expert_list
def test_asyn_transfer_and_update(mock_adaptor):
loader_obj = loader.D2DExpertWeightLoader()
loader_obj.set_adator(mock_adaptor)
loader_obj.comm_op_list = ["fake_op"]
loader_obj.state = loader.ExpertWeightUpdateState.READY
reqs: list[MagicMock] = []
with patch("torch.distributed.batch_isend_irecv",
return_value=[MagicMock(), MagicMock()]):
loader_obj.asyn_expert_weight_transfer(reqs)
assert loader_obj.state == loader.ExpertWeightUpdateState.TRANSFERRING
assert len(reqs) > 0
mock_req = MagicMock()
mock_req.wait.return_value = None
reqs = [mock_req]
loader_obj.recv_expert_list = [(0, 0)]
loader_obj.updated_expert_map = {20: torch.tensor(0)}
loader_obj.updated_log2phy_map = {"dummy": 1}
loader_obj.layer_id = 0
loader_obj.comm_op_list = ["op"]
loader_obj.update_expert_map_and_weight(reqs)
mock_adaptor.do_update_expert_map.assert_called_once()
mock_adaptor.do_update_log2phy_map.assert_called_once()
mock_adaptor.do_update_expert_weight.assert_called_once()
assert loader_obj.state == loader.ExpertWeightUpdateState.WAITING
assert loader_obj.recv_expert_list == []
def test_set_log2phy_map(mock_adaptor):
loader_obj = loader.D2DExpertWeightLoader()
loader_obj.set_adator(mock_adaptor)
loader_obj.set_log2phy_map({"a": 1})
assert loader_obj.updated_log2phy_map == {"a": 1}
def test_invalid_state_asyn_update(mock_adaptor):
loader_obj = loader.D2DExpertWeightLoader()
loader_obj.set_adator(mock_adaptor)
loader_obj.state = loader.ExpertWeightUpdateState.WAITING
reqs: list[Any] = []
loader_obj.asyn_expert_weight_transfer(reqs)
assert reqs == []
loader_obj.state = loader.ExpertWeightUpdateState.READY
loader_obj.update_expert_map_and_weight([])
assert not mock_adaptor.do_update_expert_map.called
def test_load_impl_not_implemented(mock_adaptor):
loader_obj = loader.D2DExpertWeightLoader()
loader_obj.set_adator(mock_adaptor)
with pytest.raises(NotImplementedError):
loader_obj.load_impl({}, {})

View File

@@ -0,0 +1,79 @@
import random
import torch
from vllm_ascend.eplb.core import eplb_utils
def test_determine_default_expert_map_single_world():
count, expert_map = eplb_utils.determine_default_expert_map(
global_expert_num=4,
world_size=1,
rank_id=0,
global_redundant_expert_num=0)
assert count == 4
assert torch.equal(expert_map, torch.arange(4, dtype=torch.int32))
def test_determine_default_expert_map_multiple_worlds_no_redundant():
count, expert_map = eplb_utils.determine_default_expert_map(
global_expert_num=8,
world_size=2,
rank_id=0,
global_redundant_expert_num=0)
assert count == 4
assert torch.all(expert_map[:4] >= 0)
assert torch.all(expert_map[4:] == -1)
def test_determine_default_expert_map_multiple_worlds_with_redundant():
count, expert_map = eplb_utils.determine_default_expert_map(
global_expert_num=5,
world_size=2,
rank_id=0,
global_redundant_expert_num=1)
assert count == 3
assert torch.all(expert_map[0:3] >= 0)
def test_generate_log2phy_map_single_rank_holding():
expert_map = torch.tensor([[0, -1], [-1, 0]], dtype=torch.int32)
log2phy_map = eplb_utils.generate_log2phy_map(expert_map)
assert torch.all(log2phy_map[:, 0] == log2phy_map[0, 0])
assert torch.all(log2phy_map[:, 1] == log2phy_map[1, 1])
def test_generate_log2phy_map_multiple_rank_holding(monkeypatch):
expert_map = torch.tensor([[0], [0]], dtype=torch.int32)
monkeypatch.setattr(random, "choice", lambda x: x[0])
log2phy_map = eplb_utils.generate_log2phy_map(expert_map)
assert log2phy_map.shape == (2, 1)
assert (log2phy_map >= 0).all()
def test_determine_default_log2phy_map_world_size_1():
log2phy = eplb_utils.determine_default_log2phy_map(
global_expert_num=3,
world_size=1,
rank_id=0,
global_redundant_expert_num=0)
assert log2phy.shape == (3, )
assert (log2phy >= 0).all()
def test_determine_default_log2phy_map_world_size_multiple():
log2phy = eplb_utils.determine_default_log2phy_map(
global_expert_num=6,
world_size=2,
rank_id=1,
global_redundant_expert_num=1)
assert log2phy.shape == (6, )
assert (log2phy >= 0).all()