From 019c7ded91f0c2a9533ace8d87f0cf56e396f979 Mon Sep 17 00:00:00 2001 From: LI SHENGYONG <49200266+shenchuxiaofugui@users.noreply.github.com> Date: Fri, 21 Nov 2025 14:24:35 +0800 Subject: [PATCH] eplb redundant expert bugfix (#4291) ### What this PR does / why we need it? Redundant experts bugfix ### Does this PR introduce _any_ user-facing change? After configuring the path for experts_map, users do not need to configure iinit_redundancy_expert. ### How was this patch tested? The accuracy of EPLB was tested with and without the use of redundant experts. - vLLM version: v0.11.0 - vLLM main: https://github.com/vllm-project/vllm/commit/2918c1b49c88c29783c86f78d2c4221cb9622379 --------- Signed-off-by: shenchuxiaofugui <1311027364@qq.com> --- .../feature_guide/eplb_swift_balancer.md | 7 +++ tests/ut/eplb/core/test_eplb_utils.py | 49 ++------------- tests/ut/ops/test_expert_load_balancer.py | 15 +++-- vllm_ascend/eplb/core/eplb_utils.py | 29 +-------- vllm_ascend/ops/expert_load_balancer.py | 8 ++- vllm_ascend/ops/fused_moe/fused_moe.py | 60 +++++++------------ vllm_ascend/quantization/w4a8_dynamic.py | 5 +- vllm_ascend/quantization/w8a8_dynamic.py | 5 +- .../torchair/ops/torchair_fused_moe.py | 21 +++---- .../quantization/torchair_w8a8_dynamic.py | 4 +- 10 files changed, 63 insertions(+), 140 deletions(-) diff --git a/docs/source/user_guide/feature_guide/eplb_swift_balancer.md b/docs/source/user_guide/feature_guide/eplb_swift_balancer.md index 722eb62b..90d8c88e 100644 --- a/docs/source/user_guide/feature_guide/eplb_swift_balancer.md +++ b/docs/source/user_guide/feature_guide/eplb_swift_balancer.md @@ -12,6 +12,13 @@ Expert balancing for MoE models in LLM serving is essential for optimal performa - Adaptive Scaling: Automatically adjusts to workload fluctuations while maintaining stable performance. - Fault Tolerance: Redundant expert placement ensures system resilience during hardware failures. +## Support Scenarios + +### Models: +DeepseekV3/V3.1/R1、Qwen3-MOE +### MOE QuantType: +W8A8-dynamic + ## How to Use EPLB ### Dynamic EPLB diff --git a/tests/ut/eplb/core/test_eplb_utils.py b/tests/ut/eplb/core/test_eplb_utils.py index 0d73c7a3..28d5d425 100644 --- a/tests/ut/eplb/core/test_eplb_utils.py +++ b/tests/ut/eplb/core/test_eplb_utils.py @@ -9,39 +9,6 @@ from vllm_ascend.eplb.core import eplb_utils from vllm_ascend.eplb.core.eplb_utils import EPLBParamUtils -def test_determine_default_expert_map_single_world(): - count, expert_map = eplb_utils.determine_default_expert_map( - global_expert_num=4, - world_size=1, - rank_id=0, - global_redundant_expert_num=0) - assert count == 4 - assert torch.equal(expert_map, torch.arange(4, dtype=torch.int32)) - - -def test_determine_default_expert_map_multiple_worlds_no_redundant(): - count, expert_map = eplb_utils.determine_default_expert_map( - global_expert_num=8, - world_size=2, - rank_id=0, - global_redundant_expert_num=0) - - assert count == 4 - assert torch.all(expert_map[:4] >= 0) - assert torch.all(expert_map[4:] == -1) - - -def test_determine_default_expert_map_multiple_worlds_with_redundant(): - count, expert_map = eplb_utils.determine_default_expert_map( - global_expert_num=5, - world_size=2, - rank_id=0, - global_redundant_expert_num=1) - - assert count == 2 - assert torch.all(expert_map[0:2] >= 0) - - def test_generate_log2phy_map_single_rank_holding(): expert_map = torch.tensor([[0, -1], [-1, 0]], dtype=torch.int32) @@ -64,21 +31,17 @@ def test_generate_log2phy_map_multiple_rank_holding(monkeypatch): def test_determine_default_log2phy_map_world_size_1(): - log2phy = eplb_utils.determine_default_log2phy_map( - global_expert_num=3, - world_size=1, - rank_id=0, - global_redundant_expert_num=0) + log2phy = eplb_utils.determine_default_log2phy_map(global_expert_num=3, + world_size=1, + rank_id=0) assert log2phy.shape == (3, ) assert (log2phy >= 0).all() def test_determine_default_log2phy_map_world_size_multiple(): - log2phy = eplb_utils.determine_default_log2phy_map( - global_expert_num=6, - world_size=2, - rank_id=1, - global_redundant_expert_num=1) + log2phy = eplb_utils.determine_default_log2phy_map(global_expert_num=6, + world_size=2, + rank_id=1) assert log2phy.shape == (6, ) assert (log2phy >= 0).all() diff --git a/tests/ut/ops/test_expert_load_balancer.py b/tests/ut/ops/test_expert_load_balancer.py index 97beada1..f7f68472 100644 --- a/tests/ut/ops/test_expert_load_balancer.py +++ b/tests/ut/ops/test_expert_load_balancer.py @@ -48,8 +48,7 @@ class TestExpertLoadBalancer(TestBase): with open(json_file, 'r') as f: self.expert_map: MockData = json.load(f) - self.expert_load_balancer = ExpertLoadBalancer(json_file, - global_expert_num=8) + self.expert_load_balancer = ExpertLoadBalancer(json_file, 8) def test_init(self): @@ -83,7 +82,7 @@ class TestExpertLoadBalancer(TestBase): ) self.assertEqual(expert_placement_map.shape, (self.expert_load_balancer.layers_num, - self.expert_load_balancer.ranks_num, 8)) + self.expert_load_balancer.ranks_num, 10)) self.assertTrue(torch.all(expert_placement_map >= -1)) def test_generate_log2phy_expert_map(self): @@ -91,7 +90,7 @@ class TestExpertLoadBalancer(TestBase): log2phy_map = self.expert_load_balancer.generate_log2phy_expert_map( layer_id) self.assertEqual(log2phy_map.shape, - (self.expert_load_balancer.ranks_num, 8)) + (self.expert_load_balancer.ranks_num, 10)) self.assertTrue(torch.all(log2phy_map >= -1)) @mock.patch("torch_npu.npu._lazy_init") @@ -102,7 +101,7 @@ class TestExpertLoadBalancer(TestBase): rank_local_expert_num, rank_expert_map = self.expert_load_balancer.get_rank_placement_map( layer_id, rank_id) self.assertEqual(rank_local_expert_num, 5) - expected_tensor = torch.tensor([2, -1, 1, 3, -1, 4, -1, 0], + expected_tensor = torch.tensor([2, -1, 1, 3, -1, 4, -1, 0, -1, -1], dtype=torch.int32).to( rank_expert_map.device) self.assertTrue(rank_expert_map.equal(expected_tensor)) @@ -110,7 +109,7 @@ class TestExpertLoadBalancer(TestBase): rank_id = 1 rank_local_expert_num, rank_expert_map = self.expert_load_balancer.get_rank_placement_map( layer_id, rank_id) - expected_tensor = torch.tensor([-1, 1, 4, -1, 2, -1, 0, 3], + expected_tensor = torch.tensor([-1, 1, 4, -1, 2, -1, 0, 3, -1, -1], dtype=torch.int32).to( rank_expert_map.device) self.assertTrue(rank_expert_map.equal(expected_tensor)) @@ -120,7 +119,7 @@ class TestExpertLoadBalancer(TestBase): rank_id = 0 log2phy_map = self.expert_load_balancer.get_rank_log2phy_map( layer_id, rank_id) - expected_tensor = torch.tensor([2, 6, 1, 3, 7, 4, 5, 0], + expected_tensor = torch.tensor([2, 6, 1, 3, 7, 4, 5, 0, -1, -1], dtype=torch.int32).to( log2phy_map.device) self.assertTrue(log2phy_map.equal(expected_tensor)) @@ -128,7 +127,7 @@ class TestExpertLoadBalancer(TestBase): rank_id = 1 log2phy_map = self.expert_load_balancer.get_rank_log2phy_map( layer_id, rank_id) - expected_tensor = torch.tensor([2, 6, 9, 3, 7, 4, 5, 8], + expected_tensor = torch.tensor([2, 6, 9, 3, 7, 4, 5, 8, -1, -1], dtype=torch.int32).to( log2phy_map.device) self.assertTrue(log2phy_map.equal(expected_tensor)) diff --git a/vllm_ascend/eplb/core/eplb_utils.py b/vllm_ascend/eplb/core/eplb_utils.py index 4e9c56c9..b43b85b6 100644 --- a/vllm_ascend/eplb/core/eplb_utils.py +++ b/vllm_ascend/eplb/core/eplb_utils.py @@ -25,32 +25,6 @@ from vllm.logger import logger import vllm_ascend.envs as envs_ascend -def determine_default_expert_map(global_expert_num, world_size, rank_id, - global_redundant_expert_num): - if world_size == 1: - local_ids = torch.arange(global_expert_num, dtype=torch.int32) - return (global_expert_num, local_ids) - - local_num_experts = global_expert_num // world_size - - expert_map = torch.full((global_expert_num, ), -1, dtype=torch.int32) - - if rank_id < world_size - 1: - start = rank_id * local_num_experts - end = (rank_id + 1) * local_num_experts - local_count = local_num_experts - else: - start = rank_id * local_num_experts - end = global_expert_num - local_count = global_expert_num - rank_id * local_num_experts - - if isinstance(local_count, int): - local_ids = torch.arange(local_count, dtype=torch.int32) - expert_map[start:end] = local_ids - - return (local_count, expert_map) - - def generate_log2phy_map(expert_map): num_local_experts = expert_map.max() + 1 log2phy_map = expert_map.clone() @@ -90,8 +64,7 @@ def generate_log2phy_map(expert_map): return log2phy_map -def determine_default_log2phy_map(global_expert_num, world_size, rank_id, - global_redundant_expert_num): +def determine_default_log2phy_map(global_expert_num, world_size, rank_id): if world_size == 1: local_ids = torch.arange(global_expert_num, dtype=torch.int32) expert_map_all = local_ids.unsqueeze(0).expand(world_size, -1) diff --git a/vllm_ascend/ops/expert_load_balancer.py b/vllm_ascend/ops/expert_load_balancer.py index 604986b4..7e8a9aef 100644 --- a/vllm_ascend/ops/expert_load_balancer.py +++ b/vllm_ascend/ops/expert_load_balancer.py @@ -8,12 +8,14 @@ import torch.distributed as dist class ExpertLoadBalancer(object): - def __init__(self, expert_map_path, global_expert_num): + def __init__(self, expert_map_path, num_experts): self.expert_map_path = expert_map_path - self.global_expert_num = global_expert_num + self.num_experts = num_experts self.tensor_data = [] self.expert_map_tensor, self.layers_num, self.ranks_num = ( self._expert_file_to_tensor()) + self.global_expert_num = num_experts + self.get_global_redundant_expert_num( + ) self.expert_placement_map = self.generate_expert_placement_map() def _expert_file_to_tensor(self): @@ -95,7 +97,7 @@ class ExpertLoadBalancer(object): def get_global_redundant_expert_num(self): global_redundant_expert_num = ( len(self.expert_map_tensor[0][0]) * self.ranks_num - - self.global_expert_num) + self.num_experts) return global_redundant_expert_num def check_expert_map_tensor(self): diff --git a/vllm_ascend/ops/fused_moe/fused_moe.py b/vllm_ascend/ops/fused_moe/fused_moe.py index 113cd47e..f82d341a 100644 --- a/vllm_ascend/ops/fused_moe/fused_moe.py +++ b/vllm_ascend/ops/fused_moe/fused_moe.py @@ -32,8 +32,7 @@ from vllm.model_executor.layers.fused_moe.layer import ( from vllm_ascend.ascend_config import get_ascend_config from vllm_ascend.ascend_forward_context import MoECommType from vllm_ascend.distributed.parallel_state import get_mc2_group -from vllm_ascend.eplb.core.eplb_utils import (determine_default_expert_map, - determine_default_log2phy_map) +from vllm_ascend.eplb.core.eplb_utils import determine_default_log2phy_map from vllm_ascend.ops.expert_load_balancer import ExpertLoadBalancer from vllm_ascend.ops.fused_moe.experts_selector import select_experts from vllm_ascend.ops.fused_moe.moe_comm_method import setup_moe_comm_method @@ -183,10 +182,8 @@ class AscendFusedMoE(FusedMoE): AscendFusedMoE.moe_counter += 1 self.moe_instance_id = AscendFusedMoE.moe_counter - self.global_num_experts = num_experts self.expert_map = None self.log2phy = None - self.global_redundant_expert_num = 0 if self.quant_config is None: self.quant_method = AscendUnquantizedFusedMoEMethod( @@ -210,15 +207,24 @@ class AscendFusedMoE(FusedMoE): vllm_config = get_current_vllm_config() self.e_score_correction_bias.data = self.e_score_correction_bias.data.to( dtype=vllm_config.model_config.dtype) + + # init moe. + if vllm_version_is("0.11.0"): + self.local_num_experts, self.expert_map = determine_expert_map( + self.ep_size, self.ep_rank, self.global_num_experts) + else: + self.local_num_experts, self.expert_map, _ = determine_expert_map( + self.ep_size, self.ep_rank, self.global_num_experts) # static eplb initializing with expert_map_path if self.expert_map_path and os.path.exists( self.expert_map_path) and os.access(self.expert_map_path, os.R_OK): self.expert_load_balancer = ExpertLoadBalancer( - self.expert_map_path, self.global_num_experts) + self.expert_map_path, num_experts) self.expert_load_balancer.check_expert_map_tensor() self.global_redundant_expert_num = ( self.expert_load_balancer.get_global_redundant_expert_num()) + self.global_num_experts = num_experts + self.global_redundant_expert_num try: self.local_num_experts, self.expert_map = ( self.expert_load_balancer.get_rank_placement_map( @@ -228,45 +234,21 @@ class AscendFusedMoE(FusedMoE): except Exception as e: logger.warning( f"Init expert map of mtp/eagle when using sample.{e}") - self.local_num_experts, self.expert_map = determine_default_expert_map( - self.global_num_experts, self.ep_size, self.ep_rank, - self.global_redundant_expert_num) self.log2phy = determine_default_log2phy_map( - self.global_num_experts, self.ep_size, self.ep_rank, - self.global_redundant_expert_num).npu() - if self.expert_map is not None and isinstance( - self.expert_map, torch.Tensor): - logger.info_once( - "[EP Rank %s/%s] Expert parallelism is enabled. Local/global" - " number of experts: %s/%s. Experts local to global index map:" - " %s.", self.ep_rank, self.ep_size, self.local_num_experts, - self.global_num_experts, - get_compressed_expert_map(self.expert_map)) + self.global_num_experts, self.ep_size, self.ep_rank).npu() else: - # init moe. - if vllm_version_is("0.11.0"): - self.local_num_experts, self.expert_map = determine_expert_map( - self.ep_size, self.ep_rank, self.global_num_experts) - else: - self.local_num_experts, self.expert_map, _ = determine_expert_map( - self.ep_size, self.ep_rank, self.global_num_experts) # dynamic eplb initializing with not expert_map_path if self.dynamic_eplb: - self.global_redundant_expert_num = ascend_config.init_redundancy_expert - self.local_num_experts, self.expert_map = determine_default_expert_map( - self.global_num_experts, self.ep_size, self.ep_rank, - self.global_redundant_expert_num) self.log2phy = determine_default_log2phy_map( - self.global_num_experts, self.ep_size, self.ep_rank, - self.global_redundant_expert_num).npu() - if self.expert_map is not None and isinstance( - self.expert_map, torch.Tensor): - logger.info_once( - "[EP Rank %s/%s] Expert parallelism is enabled. Local/global" - " number of experts: %s/%s. Experts local to global index map:" - " %s.", self.ep_rank, self.ep_size, self.local_num_experts, - self.global_num_experts, - get_compressed_expert_map(self.expert_map)) + self.global_num_experts, self.ep_size, self.ep_rank).npu() + if self.expert_map is not None and isinstance(self.expert_map, + torch.Tensor): + logger.info_once( + "[EP Rank %s/%s] Expert parallelism is enabled. Local/global" + " number of experts: %s/%s. Experts local to global index map:" + " %s.", self.ep_rank, self.ep_size, self.local_num_experts, + self.global_num_experts, + get_compressed_expert_map(self.expert_map)) local_num_experts = (torch.sum( self.expert_map != -1) if self.expert_map is not None else self.global_num_experts) diff --git a/vllm_ascend/quantization/w4a8_dynamic.py b/vllm_ascend/quantization/w4a8_dynamic.py index 77f0f4b2..c7f1dfab 100644 --- a/vllm_ascend/quantization/w4a8_dynamic.py +++ b/vllm_ascend/quantization/w4a8_dynamic.py @@ -342,7 +342,7 @@ class AscendW4A8DynamicFusedMoEMethod: scoring_func: str = "softmax", e_score_correction_bias: Optional[torch.Tensor] = None, is_prefill: bool = True, - enable_force_load_balance: bool = True, + enable_force_load_balance: bool = False, log2phy: torch.Tensor = None, global_redundant_expert_num: int = 0, shared_experts: Optional[Any] = None, @@ -371,7 +371,8 @@ class AscendW4A8DynamicFusedMoEMethod: # to avoid accumulating too much tokens on a single rank. # currently it is only activated when doing profile runs. if enable_force_load_balance: - topk_ids = torch.randint_like(topk_ids, 0, global_num_experts) + topk_ids = torch.randint_like( + topk_ids, 0, global_num_experts - global_redundant_expert_num) topk_weights = topk_weights.to(x.dtype) diff --git a/vllm_ascend/quantization/w8a8_dynamic.py b/vllm_ascend/quantization/w8a8_dynamic.py index 8bef2567..54dc0f43 100644 --- a/vllm_ascend/quantization/w8a8_dynamic.py +++ b/vllm_ascend/quantization/w8a8_dynamic.py @@ -213,7 +213,7 @@ class AscendW8A8DynamicFusedMoEMethod: scoring_func: str = "softmax", e_score_correction_bias: Optional[torch.Tensor] = None, is_prefill: bool = True, - enable_force_load_balance: bool = True, + enable_force_load_balance: bool = False, log2phy: torch.Tensor = None, global_redundant_expert_num: int = 0, shared_experts: Optional[Any] = None, @@ -242,7 +242,8 @@ class AscendW8A8DynamicFusedMoEMethod: # to avoid accumulating too much tokens on a single rank. # currently it is only activated when doing profile runs. if enable_force_load_balance: - topk_ids = torch.randint_like(topk_ids, 0, global_num_experts) + topk_ids = torch.randint_like( + topk_ids, 0, global_num_experts - global_redundant_expert_num) topk_weights = topk_weights.to(self.in_dtype) diff --git a/vllm_ascend/torchair/ops/torchair_fused_moe.py b/vllm_ascend/torchair/ops/torchair_fused_moe.py index 8cb2ff9e..415aeb62 100644 --- a/vllm_ascend/torchair/ops/torchair_fused_moe.py +++ b/vllm_ascend/torchair/ops/torchair_fused_moe.py @@ -43,8 +43,7 @@ from vllm.model_executor.layers.quantization.base_config import \ from vllm_ascend.ascend_config import get_ascend_config from vllm_ascend.ascend_forward_context import FusedMoEState from vllm_ascend.distributed.parallel_state import get_mc2_group -from vllm_ascend.eplb.core.eplb_utils import (determine_default_expert_map, - determine_default_log2phy_map) +from vllm_ascend.eplb.core.eplb_utils import determine_default_log2phy_map from vllm_ascend.ops.expert_load_balancer import ExpertLoadBalancer from vllm_ascend.quantization.quant_config import AscendFusedMoEMethod from vllm_ascend.torchair.ops.sequence_parallel import MetadataForPadding @@ -1042,7 +1041,7 @@ class TorchairAscendFusedMoE(FusedMoE): self.expert_map_path) and os.access(self.expert_map_path, os.R_OK): self.expert_load_balancer = ExpertLoadBalancer( - self.expert_map_path, self.global_num_experts) + self.expert_map_path, num_experts) self.expert_load_balancer.check_expert_map_tensor() self.global_redundant_expert_num = ( self.expert_load_balancer.get_global_redundant_expert_num()) @@ -1052,15 +1051,14 @@ class TorchairAscendFusedMoE(FusedMoE): self.moe_instance_id, self.ep_rank)) self.log2phy = self.expert_load_balancer.get_rank_log2phy_map( self.moe_instance_id, self.ep_rank).npu() + self.global_num_experts = num_experts + self.global_redundant_expert_num except Exception as e: logger.warning( f"Init expert map of mtp/eagle when using sample.{e}") - self.local_num_experts, self.expert_map = determine_default_expert_map( - self.global_num_experts, self.ep_size, self.ep_rank, - self.global_redundant_expert_num) + self.local_num_experts, self.expert_map = determine_expert_map( + self.ep_size, self.ep_rank, self.global_num_experts) self.log2phy = determine_default_log2phy_map( - self.global_num_experts, self.ep_size, self.ep_rank, - self.global_redundant_expert_num).npu() + self.global_num_experts, self.ep_size, self.ep_rank).npu() if self.expert_map is not None and isinstance( self.expert_map, torch.Tensor): logger.info_once( @@ -1079,13 +1077,8 @@ class TorchairAscendFusedMoE(FusedMoE): self.ep_size, self.ep_rank, self.global_num_experts) # dynamic eplb initializing with not expert_map_path if self.dynamic_eplb: - self.global_redundant_expert_num = ascend_config.init_redundancy_expert - self.local_num_experts, self.expert_map = determine_default_expert_map( - self.global_num_experts, self.ep_size, self.ep_rank, - self.global_redundant_expert_num) self.log2phy = determine_default_log2phy_map( - self.global_num_experts, self.ep_size, self.ep_rank, - self.global_redundant_expert_num).npu() + self.global_num_experts, self.ep_size, self.ep_rank).npu() if self.expert_map is not None and isinstance( self.expert_map, torch.Tensor): logger.info_once( diff --git a/vllm_ascend/torchair/quantization/torchair_w8a8_dynamic.py b/vllm_ascend/torchair/quantization/torchair_w8a8_dynamic.py index bc0a8d35..d8ac2f93 100644 --- a/vllm_ascend/torchair/quantization/torchair_w8a8_dynamic.py +++ b/vllm_ascend/torchair/quantization/torchair_w8a8_dynamic.py @@ -990,7 +990,9 @@ class TorchairAscendW8A8DynamicFusedMoEMethod: # to avoid accumulating too much tokens on a single rank. # currently it is only activated when doing profile runs. if enable_force_load_balance: - topk_ids = torch.randint_like(topk_ids, 0, global_num_experts) + topk_ids = torch.randint_like( + topk_ids, 0, + global_num_experts - global_redundant_expert_num) topk_weights = topk_weights.to(x.dtype) if fused_moe_state == FusedMoEState.AllGatherEP: