From 593a96056cfaaa64746ff05f3da526382a3d5d64 Mon Sep 17 00:00:00 2001 From: LI SHENGYONG <49200266+shenchuxiaofugui@users.noreply.github.com> Date: Wed, 3 Dec 2025 12:00:05 +0800 Subject: [PATCH] =?UTF-8?q?=E3=80=90EPLB=E3=80=91Eplb=20Redundant=20Expert?= =?UTF-8?q?s=20Bugfix=20(#4232)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### What this PR does / why we need it? Redundant experts bugfix The calculation logic for redundant experts has been fixed, allowing the correct number of redundant experts to be calculated using the map. Therefore, there is no longer a need to set the redundant expert parameter when passing the map. ### Does this PR introduce _any_ user-facing change? After configuring the path for experts_map, users do not need to configure iinit_redundancy_expert. ### How was this patch tested? The accuracy of EPLB was tested with and without the use of redundant experts. --------- Signed-off-by: shenchuxiaofugui <1311027364@qq.com> --- tests/ut/eplb/core/test_eplb_utils.py | 16 +++---- tests/ut/ops/test_expert_load_balancer.py | 15 +++--- vllm_ascend/eplb/core/eplb_utils.py | 3 +- vllm_ascend/ops/common_fused_moe.py | 48 ++++++------------- vllm_ascend/ops/expert_load_balancer.py | 8 ++-- vllm_ascend/quantization/w4a8_dynamic.py | 3 +- vllm_ascend/quantization/w8a8_dynamic.py | 5 +- .../torchair/ops/torchair_fused_moe.py | 6 +-- .../quantization/torchair_w8a8_dynamic.py | 6 ++- 9 files changed, 45 insertions(+), 65 deletions(-) diff --git a/tests/ut/eplb/core/test_eplb_utils.py b/tests/ut/eplb/core/test_eplb_utils.py index 29ee427..4387800 100644 --- a/tests/ut/eplb/core/test_eplb_utils.py +++ b/tests/ut/eplb/core/test_eplb_utils.py @@ -64,21 +64,17 @@ def test_generate_log2phy_map_multiple_rank_holding(monkeypatch): def test_determine_default_log2phy_map_world_size_1(): - log2phy = eplb_utils.determine_default_log2phy_map( - global_expert_num=3, - world_size=1, - rank_id=0, - global_redundant_expert_num=0) + log2phy = eplb_utils.determine_default_log2phy_map(global_expert_num=3, + world_size=1, + rank_id=0) assert log2phy.shape == (3, ) assert (log2phy >= 0).all() def test_determine_default_log2phy_map_world_size_multiple(): - log2phy = eplb_utils.determine_default_log2phy_map( - global_expert_num=6, - world_size=2, - rank_id=1, - global_redundant_expert_num=1) + log2phy = eplb_utils.determine_default_log2phy_map(global_expert_num=6, + world_size=2, + rank_id=1) assert log2phy.shape == (6, ) assert (log2phy >= 0).all() diff --git a/tests/ut/ops/test_expert_load_balancer.py b/tests/ut/ops/test_expert_load_balancer.py index 97beada..f7f6847 100644 --- a/tests/ut/ops/test_expert_load_balancer.py +++ b/tests/ut/ops/test_expert_load_balancer.py @@ -48,8 +48,7 @@ class TestExpertLoadBalancer(TestBase): with open(json_file, 'r') as f: self.expert_map: MockData = json.load(f) - self.expert_load_balancer = ExpertLoadBalancer(json_file, - global_expert_num=8) + self.expert_load_balancer = ExpertLoadBalancer(json_file, 8) def test_init(self): @@ -83,7 +82,7 @@ class TestExpertLoadBalancer(TestBase): ) self.assertEqual(expert_placement_map.shape, (self.expert_load_balancer.layers_num, - self.expert_load_balancer.ranks_num, 8)) + self.expert_load_balancer.ranks_num, 10)) self.assertTrue(torch.all(expert_placement_map >= -1)) def test_generate_log2phy_expert_map(self): @@ -91,7 +90,7 @@ class TestExpertLoadBalancer(TestBase): log2phy_map = self.expert_load_balancer.generate_log2phy_expert_map( layer_id) self.assertEqual(log2phy_map.shape, - (self.expert_load_balancer.ranks_num, 8)) + (self.expert_load_balancer.ranks_num, 10)) self.assertTrue(torch.all(log2phy_map >= -1)) @mock.patch("torch_npu.npu._lazy_init") @@ -102,7 +101,7 @@ class TestExpertLoadBalancer(TestBase): rank_local_expert_num, rank_expert_map = self.expert_load_balancer.get_rank_placement_map( layer_id, rank_id) self.assertEqual(rank_local_expert_num, 5) - expected_tensor = torch.tensor([2, -1, 1, 3, -1, 4, -1, 0], + expected_tensor = torch.tensor([2, -1, 1, 3, -1, 4, -1, 0, -1, -1], dtype=torch.int32).to( rank_expert_map.device) self.assertTrue(rank_expert_map.equal(expected_tensor)) @@ -110,7 +109,7 @@ class TestExpertLoadBalancer(TestBase): rank_id = 1 rank_local_expert_num, rank_expert_map = self.expert_load_balancer.get_rank_placement_map( layer_id, rank_id) - expected_tensor = torch.tensor([-1, 1, 4, -1, 2, -1, 0, 3], + expected_tensor = torch.tensor([-1, 1, 4, -1, 2, -1, 0, 3, -1, -1], dtype=torch.int32).to( rank_expert_map.device) self.assertTrue(rank_expert_map.equal(expected_tensor)) @@ -120,7 +119,7 @@ class TestExpertLoadBalancer(TestBase): rank_id = 0 log2phy_map = self.expert_load_balancer.get_rank_log2phy_map( layer_id, rank_id) - expected_tensor = torch.tensor([2, 6, 1, 3, 7, 4, 5, 0], + expected_tensor = torch.tensor([2, 6, 1, 3, 7, 4, 5, 0, -1, -1], dtype=torch.int32).to( log2phy_map.device) self.assertTrue(log2phy_map.equal(expected_tensor)) @@ -128,7 +127,7 @@ class TestExpertLoadBalancer(TestBase): rank_id = 1 log2phy_map = self.expert_load_balancer.get_rank_log2phy_map( layer_id, rank_id) - expected_tensor = torch.tensor([2, 6, 9, 3, 7, 4, 5, 8], + expected_tensor = torch.tensor([2, 6, 9, 3, 7, 4, 5, 8, -1, -1], dtype=torch.int32).to( log2phy_map.device) self.assertTrue(log2phy_map.equal(expected_tensor)) diff --git a/vllm_ascend/eplb/core/eplb_utils.py b/vllm_ascend/eplb/core/eplb_utils.py index 0a55868..d7fd17e 100644 --- a/vllm_ascend/eplb/core/eplb_utils.py +++ b/vllm_ascend/eplb/core/eplb_utils.py @@ -88,8 +88,7 @@ def generate_log2phy_map(expert_map): return log2phy_map -def determine_default_log2phy_map(global_expert_num, world_size, rank_id, - global_redundant_expert_num): +def determine_default_log2phy_map(global_expert_num, world_size, rank_id): if world_size == 1: local_ids = torch.arange(global_expert_num, dtype=torch.int32) expert_map_all = local_ids.unsqueeze(0).expand(world_size, -1) diff --git a/vllm_ascend/ops/common_fused_moe.py b/vllm_ascend/ops/common_fused_moe.py index e33835b..d560710 100644 --- a/vllm_ascend/ops/common_fused_moe.py +++ b/vllm_ascend/ops/common_fused_moe.py @@ -33,8 +33,7 @@ from vllm.model_executor.layers.shared_fused_moe import SharedFusedMoE from vllm_ascend.ascend_config import get_ascend_config from vllm_ascend.ascend_forward_context import MoECommType from vllm_ascend.distributed.parallel_state import get_mc2_group -from vllm_ascend.eplb.core.eplb_utils import (determine_default_expert_map, - determine_default_log2phy_map) +from vllm_ascend.eplb.core.eplb_utils import determine_default_log2phy_map from vllm_ascend.ops.expert_load_balancer import ExpertLoadBalancer from vllm_ascend.ops.moe.experts_selector import select_experts from vllm_ascend.ops.moe.moe_comm_method import setup_moe_comm_method @@ -149,10 +148,8 @@ class AscendFusedMoE(FusedMoE): AscendFusedMoE.moe_counter += 1 self.moe_instance_id = AscendFusedMoE.moe_counter - self.global_num_experts = num_experts self.expert_map = None self.log2phy = None - self.global_redundant_expert_num = 0 if self.quant_config is None: self.quant_method = AscendUnquantizedFusedMoEMethod( @@ -181,10 +178,11 @@ class AscendFusedMoE(FusedMoE): self.expert_map_path) and os.access(self.expert_map_path, os.R_OK): self.expert_load_balancer = ExpertLoadBalancer( - self.expert_map_path, self.global_num_experts) + self.expert_map_path, num_experts) self.expert_load_balancer.check_expert_map_tensor() self.global_redundant_expert_num = ( self.expert_load_balancer.get_global_redundant_expert_num()) + self.global_num_experts = num_experts + self.global_redundant_expert_num try: self.local_num_experts, self.expert_map = ( self.expert_load_balancer.get_rank_placement_map( @@ -194,41 +192,26 @@ class AscendFusedMoE(FusedMoE): except Exception as e: logger.warning( f"Init expert map of mtp/eagle when using sample.{e}") - self.local_num_experts, self.expert_map = determine_default_expert_map( - self.global_num_experts, self.ep_size, self.ep_rank, - self.global_redundant_expert_num) + self.local_num_experts, self.expert_map = determine_expert_map( + self.ep_size, self.ep_rank, self.global_num_experts) self.log2phy = determine_default_log2phy_map( - self.global_num_experts, self.ep_size, self.ep_rank, - self.global_redundant_expert_num).npu() - if self.expert_map is not None and isinstance( - self.expert_map, torch.Tensor): - logger.info_once( - "[EP Rank %s/%s] Expert parallelism is enabled. Local/global" - " number of experts: %s/%s. Experts local to global index map:" - " %s.", self.ep_rank, self.ep_size, self.local_num_experts, - self.global_num_experts, - get_compressed_expert_map(self.expert_map)) + self.global_num_experts, self.ep_size, self.ep_rank).npu() else: # init moe. self.local_num_experts, self.expert_map = determine_expert_map( self.ep_size, self.ep_rank, self.global_num_experts) # dynamic eplb initializing with not expert_map_path if self.dynamic_eplb: - self.global_redundant_expert_num = ascend_config.init_redundancy_expert - self.local_num_experts, self.expert_map = determine_default_expert_map( - self.global_num_experts, self.ep_size, self.ep_rank, - self.global_redundant_expert_num) self.log2phy = determine_default_log2phy_map( - self.global_num_experts, self.ep_size, self.ep_rank, - self.global_redundant_expert_num).npu() - if self.expert_map is not None and isinstance( - self.expert_map, torch.Tensor): - logger.info_once( - "[EP Rank %s/%s] Expert parallelism is enabled. Local/global" - " number of experts: %s/%s. Experts local to global index map:" - " %s.", self.ep_rank, self.ep_size, self.local_num_experts, - self.global_num_experts, - get_compressed_expert_map(self.expert_map)) + self.global_num_experts, self.ep_size, self.ep_rank).npu() + if self.expert_map is not None and isinstance(self.expert_map, + torch.Tensor): + logger.info_once( + "[EP Rank %s/%s] Expert parallelism is enabled. Local/global" + " number of experts: %s/%s. Experts local to global index map:" + " %s.", self.ep_rank, self.ep_size, self.local_num_experts, + self.global_num_experts, + get_compressed_expert_map(self.expert_map)) local_num_experts = (torch.sum( self.expert_map != -1) if self.expert_map is not None else self.global_num_experts) @@ -302,7 +285,6 @@ class AscendFusedMoE(FusedMoE): # This approach may overlook some extreme scenarios. enable_force_load_balance = forward_context.in_profile_run - forward_context = get_forward_context() hidden_states, router_logits = forward_context.moe_comm_method.prepare( hidden_states=hidden_states, router_logits=router_logits, diff --git a/vllm_ascend/ops/expert_load_balancer.py b/vllm_ascend/ops/expert_load_balancer.py index de6a7c5..5b03ffa 100644 --- a/vllm_ascend/ops/expert_load_balancer.py +++ b/vllm_ascend/ops/expert_load_balancer.py @@ -8,12 +8,14 @@ import torch.distributed as dist class ExpertLoadBalancer(object): - def __init__(self, expert_map_path, global_expert_num): + def __init__(self, expert_map_path, num_experts): self.expert_map_path = expert_map_path - self.global_expert_num = global_expert_num + self.num_experts = num_experts self.tensor_data = [] self.expert_map_tensor, self.layers_num, self.ranks_num = ( self._expert_file_to_tensor()) + self.global_expert_num = num_experts + self.get_global_redundant_expert_num( + ) self.expert_placement_map = self.generate_expert_placement_map() def _expert_file_to_tensor(self): @@ -96,7 +98,7 @@ class ExpertLoadBalancer(object): def get_global_redundant_expert_num(self): global_redundant_expert_num = ( len(self.expert_map_tensor[0][0]) * self.ranks_num - - self.global_expert_num) + self.num_experts) return global_redundant_expert_num def check_expert_map_tensor(self): diff --git a/vllm_ascend/quantization/w4a8_dynamic.py b/vllm_ascend/quantization/w4a8_dynamic.py index 2133607..11dc97d 100644 --- a/vllm_ascend/quantization/w4a8_dynamic.py +++ b/vllm_ascend/quantization/w4a8_dynamic.py @@ -371,7 +371,8 @@ class AscendW4A8DynamicFusedMoEMethod: # to avoid accumulating too much tokens on a single rank. # currently it is only activated when doing profile runs. if enable_force_load_balance: - topk_ids = torch.randint_like(topk_ids, 0, global_num_experts) + topk_ids = torch.randint_like( + topk_ids, 0, global_num_experts - global_redundant_expert_num) topk_weights = topk_weights.to(x.dtype) diff --git a/vllm_ascend/quantization/w8a8_dynamic.py b/vllm_ascend/quantization/w8a8_dynamic.py index 0f96e8c..59b0479 100644 --- a/vllm_ascend/quantization/w8a8_dynamic.py +++ b/vllm_ascend/quantization/w8a8_dynamic.py @@ -197,7 +197,7 @@ class AscendW8A8DynamicFusedMoEMethod: scoring_func: str = "softmax", e_score_correction_bias: Optional[torch.Tensor] = None, is_prefill: bool = True, - enable_force_load_balance: bool = True, + enable_force_load_balance: bool = False, log2phy: torch.Tensor = None, global_redundant_expert_num: int = 0, shared_experts: Optional[Any] = None, @@ -225,7 +225,8 @@ class AscendW8A8DynamicFusedMoEMethod: # to avoid accumulating too much tokens on a single rank. # currently it is only activated when doing profile runs. if enable_force_load_balance: - topk_ids = torch.randint_like(topk_ids, 0, global_num_experts) + topk_ids = torch.randint_like( + topk_ids, 0, global_num_experts - global_redundant_expert_num) if self.use_aclgraph: moe_comm_method = get_forward_context().moe_comm_method diff --git a/vllm_ascend/torchair/ops/torchair_fused_moe.py b/vllm_ascend/torchair/ops/torchair_fused_moe.py index 7a14ca5..d42d023 100644 --- a/vllm_ascend/torchair/ops/torchair_fused_moe.py +++ b/vllm_ascend/torchair/ops/torchair_fused_moe.py @@ -1058,8 +1058,7 @@ class TorchairAscendFusedMoE(FusedMoE): self.global_num_experts, self.ep_size, self.ep_rank, self.global_redundant_expert_num) self.log2phy = determine_default_log2phy_map( - self.global_num_experts, self.ep_size, self.ep_rank, - self.global_redundant_expert_num).npu() + self.global_num_experts, self.ep_size, self.ep_rank).npu() if self.expert_map is not None and isinstance( self.expert_map, torch.Tensor): logger.info_once( @@ -1079,8 +1078,7 @@ class TorchairAscendFusedMoE(FusedMoE): self.global_num_experts, self.ep_size, self.ep_rank, self.global_redundant_expert_num) self.log2phy = determine_default_log2phy_map( - self.global_num_experts, self.ep_size, self.ep_rank, - self.global_redundant_expert_num).npu() + self.global_num_experts, self.ep_size, self.ep_rank).npu() if self.expert_map is not None and isinstance( self.expert_map, torch.Tensor): logger.info_once( diff --git a/vllm_ascend/torchair/quantization/torchair_w8a8_dynamic.py b/vllm_ascend/torchair/quantization/torchair_w8a8_dynamic.py index bc0a8d3..f639270 100644 --- a/vllm_ascend/torchair/quantization/torchair_w8a8_dynamic.py +++ b/vllm_ascend/torchair/quantization/torchair_w8a8_dynamic.py @@ -925,7 +925,7 @@ class TorchairAscendW8A8DynamicFusedMoEMethod: scoring_func: str = "softmax", e_score_correction_bias: Optional[torch.Tensor] = None, is_prefill: bool = True, - enable_force_load_balance: bool = True, + enable_force_load_balance: bool = False, log2phy: torch.Tensor = None, global_redundant_expert_num: int = 0, shared_experts: Optional[Any] = None, @@ -990,7 +990,9 @@ class TorchairAscendW8A8DynamicFusedMoEMethod: # to avoid accumulating too much tokens on a single rank. # currently it is only activated when doing profile runs. if enable_force_load_balance: - topk_ids = torch.randint_like(topk_ids, 0, global_num_experts) + topk_ids = torch.randint_like( + topk_ids, 0, + global_num_experts - global_redundant_expert_num) topk_weights = topk_weights.to(x.dtype) if fused_moe_state == FusedMoEState.AllGatherEP: