diff --git a/tests/ut/eplb/core/test_eplb_utils.py b/tests/ut/eplb/core/test_eplb_utils.py index 29ee427..4387800 100644 --- a/tests/ut/eplb/core/test_eplb_utils.py +++ b/tests/ut/eplb/core/test_eplb_utils.py @@ -64,21 +64,17 @@ def test_generate_log2phy_map_multiple_rank_holding(monkeypatch): def test_determine_default_log2phy_map_world_size_1(): - log2phy = eplb_utils.determine_default_log2phy_map( - global_expert_num=3, - world_size=1, - rank_id=0, - global_redundant_expert_num=0) + log2phy = eplb_utils.determine_default_log2phy_map(global_expert_num=3, + world_size=1, + rank_id=0) assert log2phy.shape == (3, ) assert (log2phy >= 0).all() def test_determine_default_log2phy_map_world_size_multiple(): - log2phy = eplb_utils.determine_default_log2phy_map( - global_expert_num=6, - world_size=2, - rank_id=1, - global_redundant_expert_num=1) + log2phy = eplb_utils.determine_default_log2phy_map(global_expert_num=6, + world_size=2, + rank_id=1) assert log2phy.shape == (6, ) assert (log2phy >= 0).all() diff --git a/tests/ut/ops/test_expert_load_balancer.py b/tests/ut/ops/test_expert_load_balancer.py index 97beada..f7f6847 100644 --- a/tests/ut/ops/test_expert_load_balancer.py +++ b/tests/ut/ops/test_expert_load_balancer.py @@ -48,8 +48,7 @@ class TestExpertLoadBalancer(TestBase): with open(json_file, 'r') as f: self.expert_map: MockData = json.load(f) - self.expert_load_balancer = ExpertLoadBalancer(json_file, - global_expert_num=8) + self.expert_load_balancer = ExpertLoadBalancer(json_file, 8) def test_init(self): @@ -83,7 +82,7 @@ class TestExpertLoadBalancer(TestBase): ) self.assertEqual(expert_placement_map.shape, (self.expert_load_balancer.layers_num, - self.expert_load_balancer.ranks_num, 8)) + self.expert_load_balancer.ranks_num, 10)) self.assertTrue(torch.all(expert_placement_map >= -1)) def test_generate_log2phy_expert_map(self): @@ -91,7 +90,7 @@ class TestExpertLoadBalancer(TestBase): log2phy_map = self.expert_load_balancer.generate_log2phy_expert_map( layer_id) self.assertEqual(log2phy_map.shape, - (self.expert_load_balancer.ranks_num, 8)) + (self.expert_load_balancer.ranks_num, 10)) self.assertTrue(torch.all(log2phy_map >= -1)) @mock.patch("torch_npu.npu._lazy_init") @@ -102,7 +101,7 @@ class TestExpertLoadBalancer(TestBase): rank_local_expert_num, rank_expert_map = self.expert_load_balancer.get_rank_placement_map( layer_id, rank_id) self.assertEqual(rank_local_expert_num, 5) - expected_tensor = torch.tensor([2, -1, 1, 3, -1, 4, -1, 0], + expected_tensor = torch.tensor([2, -1, 1, 3, -1, 4, -1, 0, -1, -1], dtype=torch.int32).to( rank_expert_map.device) self.assertTrue(rank_expert_map.equal(expected_tensor)) @@ -110,7 +109,7 @@ class TestExpertLoadBalancer(TestBase): rank_id = 1 rank_local_expert_num, rank_expert_map = self.expert_load_balancer.get_rank_placement_map( layer_id, rank_id) - expected_tensor = torch.tensor([-1, 1, 4, -1, 2, -1, 0, 3], + expected_tensor = torch.tensor([-1, 1, 4, -1, 2, -1, 0, 3, -1, -1], dtype=torch.int32).to( rank_expert_map.device) self.assertTrue(rank_expert_map.equal(expected_tensor)) @@ -120,7 +119,7 @@ class TestExpertLoadBalancer(TestBase): rank_id = 0 log2phy_map = self.expert_load_balancer.get_rank_log2phy_map( layer_id, rank_id) - expected_tensor = torch.tensor([2, 6, 1, 3, 7, 4, 5, 0], + expected_tensor = torch.tensor([2, 6, 1, 3, 7, 4, 5, 0, -1, -1], dtype=torch.int32).to( log2phy_map.device) self.assertTrue(log2phy_map.equal(expected_tensor)) @@ -128,7 +127,7 @@ class TestExpertLoadBalancer(TestBase): rank_id = 1 log2phy_map = self.expert_load_balancer.get_rank_log2phy_map( layer_id, rank_id) - expected_tensor = torch.tensor([2, 6, 9, 3, 7, 4, 5, 8], + expected_tensor = torch.tensor([2, 6, 9, 3, 7, 4, 5, 8, -1, -1], dtype=torch.int32).to( log2phy_map.device) self.assertTrue(log2phy_map.equal(expected_tensor)) diff --git a/vllm_ascend/eplb/core/eplb_utils.py b/vllm_ascend/eplb/core/eplb_utils.py index 0a55868..d7fd17e 100644 --- a/vllm_ascend/eplb/core/eplb_utils.py +++ b/vllm_ascend/eplb/core/eplb_utils.py @@ -88,8 +88,7 @@ def generate_log2phy_map(expert_map): return log2phy_map -def determine_default_log2phy_map(global_expert_num, world_size, rank_id, - global_redundant_expert_num): +def determine_default_log2phy_map(global_expert_num, world_size, rank_id): if world_size == 1: local_ids = torch.arange(global_expert_num, dtype=torch.int32) expert_map_all = local_ids.unsqueeze(0).expand(world_size, -1) diff --git a/vllm_ascend/ops/common_fused_moe.py b/vllm_ascend/ops/common_fused_moe.py index e33835b..d560710 100644 --- a/vllm_ascend/ops/common_fused_moe.py +++ b/vllm_ascend/ops/common_fused_moe.py @@ -33,8 +33,7 @@ from vllm.model_executor.layers.shared_fused_moe import SharedFusedMoE from vllm_ascend.ascend_config import get_ascend_config from vllm_ascend.ascend_forward_context import MoECommType from vllm_ascend.distributed.parallel_state import get_mc2_group -from vllm_ascend.eplb.core.eplb_utils import (determine_default_expert_map, - determine_default_log2phy_map) +from vllm_ascend.eplb.core.eplb_utils import determine_default_log2phy_map from vllm_ascend.ops.expert_load_balancer import ExpertLoadBalancer from vllm_ascend.ops.moe.experts_selector import select_experts from vllm_ascend.ops.moe.moe_comm_method import setup_moe_comm_method @@ -149,10 +148,8 @@ class AscendFusedMoE(FusedMoE): AscendFusedMoE.moe_counter += 1 self.moe_instance_id = AscendFusedMoE.moe_counter - self.global_num_experts = num_experts self.expert_map = None self.log2phy = None - self.global_redundant_expert_num = 0 if self.quant_config is None: self.quant_method = AscendUnquantizedFusedMoEMethod( @@ -181,10 +178,11 @@ class AscendFusedMoE(FusedMoE): self.expert_map_path) and os.access(self.expert_map_path, os.R_OK): self.expert_load_balancer = ExpertLoadBalancer( - self.expert_map_path, self.global_num_experts) + self.expert_map_path, num_experts) self.expert_load_balancer.check_expert_map_tensor() self.global_redundant_expert_num = ( self.expert_load_balancer.get_global_redundant_expert_num()) + self.global_num_experts = num_experts + self.global_redundant_expert_num try: self.local_num_experts, self.expert_map = ( self.expert_load_balancer.get_rank_placement_map( @@ -194,41 +192,26 @@ class AscendFusedMoE(FusedMoE): except Exception as e: logger.warning( f"Init expert map of mtp/eagle when using sample.{e}") - self.local_num_experts, self.expert_map = determine_default_expert_map( - self.global_num_experts, self.ep_size, self.ep_rank, - self.global_redundant_expert_num) + self.local_num_experts, self.expert_map = determine_expert_map( + self.ep_size, self.ep_rank, self.global_num_experts) self.log2phy = determine_default_log2phy_map( - self.global_num_experts, self.ep_size, self.ep_rank, - self.global_redundant_expert_num).npu() - if self.expert_map is not None and isinstance( - self.expert_map, torch.Tensor): - logger.info_once( - "[EP Rank %s/%s] Expert parallelism is enabled. Local/global" - " number of experts: %s/%s. Experts local to global index map:" - " %s.", self.ep_rank, self.ep_size, self.local_num_experts, - self.global_num_experts, - get_compressed_expert_map(self.expert_map)) + self.global_num_experts, self.ep_size, self.ep_rank).npu() else: # init moe. self.local_num_experts, self.expert_map = determine_expert_map( self.ep_size, self.ep_rank, self.global_num_experts) # dynamic eplb initializing with not expert_map_path if self.dynamic_eplb: - self.global_redundant_expert_num = ascend_config.init_redundancy_expert - self.local_num_experts, self.expert_map = determine_default_expert_map( - self.global_num_experts, self.ep_size, self.ep_rank, - self.global_redundant_expert_num) self.log2phy = determine_default_log2phy_map( - self.global_num_experts, self.ep_size, self.ep_rank, - self.global_redundant_expert_num).npu() - if self.expert_map is not None and isinstance( - self.expert_map, torch.Tensor): - logger.info_once( - "[EP Rank %s/%s] Expert parallelism is enabled. Local/global" - " number of experts: %s/%s. Experts local to global index map:" - " %s.", self.ep_rank, self.ep_size, self.local_num_experts, - self.global_num_experts, - get_compressed_expert_map(self.expert_map)) + self.global_num_experts, self.ep_size, self.ep_rank).npu() + if self.expert_map is not None and isinstance(self.expert_map, + torch.Tensor): + logger.info_once( + "[EP Rank %s/%s] Expert parallelism is enabled. Local/global" + " number of experts: %s/%s. Experts local to global index map:" + " %s.", self.ep_rank, self.ep_size, self.local_num_experts, + self.global_num_experts, + get_compressed_expert_map(self.expert_map)) local_num_experts = (torch.sum( self.expert_map != -1) if self.expert_map is not None else self.global_num_experts) @@ -302,7 +285,6 @@ class AscendFusedMoE(FusedMoE): # This approach may overlook some extreme scenarios. enable_force_load_balance = forward_context.in_profile_run - forward_context = get_forward_context() hidden_states, router_logits = forward_context.moe_comm_method.prepare( hidden_states=hidden_states, router_logits=router_logits, diff --git a/vllm_ascend/ops/expert_load_balancer.py b/vllm_ascend/ops/expert_load_balancer.py index de6a7c5..5b03ffa 100644 --- a/vllm_ascend/ops/expert_load_balancer.py +++ b/vllm_ascend/ops/expert_load_balancer.py @@ -8,12 +8,14 @@ import torch.distributed as dist class ExpertLoadBalancer(object): - def __init__(self, expert_map_path, global_expert_num): + def __init__(self, expert_map_path, num_experts): self.expert_map_path = expert_map_path - self.global_expert_num = global_expert_num + self.num_experts = num_experts self.tensor_data = [] self.expert_map_tensor, self.layers_num, self.ranks_num = ( self._expert_file_to_tensor()) + self.global_expert_num = num_experts + self.get_global_redundant_expert_num( + ) self.expert_placement_map = self.generate_expert_placement_map() def _expert_file_to_tensor(self): @@ -96,7 +98,7 @@ class ExpertLoadBalancer(object): def get_global_redundant_expert_num(self): global_redundant_expert_num = ( len(self.expert_map_tensor[0][0]) * self.ranks_num - - self.global_expert_num) + self.num_experts) return global_redundant_expert_num def check_expert_map_tensor(self): diff --git a/vllm_ascend/quantization/w4a8_dynamic.py b/vllm_ascend/quantization/w4a8_dynamic.py index 2133607..11dc97d 100644 --- a/vllm_ascend/quantization/w4a8_dynamic.py +++ b/vllm_ascend/quantization/w4a8_dynamic.py @@ -371,7 +371,8 @@ class AscendW4A8DynamicFusedMoEMethod: # to avoid accumulating too much tokens on a single rank. # currently it is only activated when doing profile runs. if enable_force_load_balance: - topk_ids = torch.randint_like(topk_ids, 0, global_num_experts) + topk_ids = torch.randint_like( + topk_ids, 0, global_num_experts - global_redundant_expert_num) topk_weights = topk_weights.to(x.dtype) diff --git a/vllm_ascend/quantization/w8a8_dynamic.py b/vllm_ascend/quantization/w8a8_dynamic.py index 0f96e8c..59b0479 100644 --- a/vllm_ascend/quantization/w8a8_dynamic.py +++ b/vllm_ascend/quantization/w8a8_dynamic.py @@ -197,7 +197,7 @@ class AscendW8A8DynamicFusedMoEMethod: scoring_func: str = "softmax", e_score_correction_bias: Optional[torch.Tensor] = None, is_prefill: bool = True, - enable_force_load_balance: bool = True, + enable_force_load_balance: bool = False, log2phy: torch.Tensor = None, global_redundant_expert_num: int = 0, shared_experts: Optional[Any] = None, @@ -225,7 +225,8 @@ class AscendW8A8DynamicFusedMoEMethod: # to avoid accumulating too much tokens on a single rank. # currently it is only activated when doing profile runs. if enable_force_load_balance: - topk_ids = torch.randint_like(topk_ids, 0, global_num_experts) + topk_ids = torch.randint_like( + topk_ids, 0, global_num_experts - global_redundant_expert_num) if self.use_aclgraph: moe_comm_method = get_forward_context().moe_comm_method diff --git a/vllm_ascend/torchair/ops/torchair_fused_moe.py b/vllm_ascend/torchair/ops/torchair_fused_moe.py index 7a14ca5..d42d023 100644 --- a/vllm_ascend/torchair/ops/torchair_fused_moe.py +++ b/vllm_ascend/torchair/ops/torchair_fused_moe.py @@ -1058,8 +1058,7 @@ class TorchairAscendFusedMoE(FusedMoE): self.global_num_experts, self.ep_size, self.ep_rank, self.global_redundant_expert_num) self.log2phy = determine_default_log2phy_map( - self.global_num_experts, self.ep_size, self.ep_rank, - self.global_redundant_expert_num).npu() + self.global_num_experts, self.ep_size, self.ep_rank).npu() if self.expert_map is not None and isinstance( self.expert_map, torch.Tensor): logger.info_once( @@ -1079,8 +1078,7 @@ class TorchairAscendFusedMoE(FusedMoE): self.global_num_experts, self.ep_size, self.ep_rank, self.global_redundant_expert_num) self.log2phy = determine_default_log2phy_map( - self.global_num_experts, self.ep_size, self.ep_rank, - self.global_redundant_expert_num).npu() + self.global_num_experts, self.ep_size, self.ep_rank).npu() if self.expert_map is not None and isinstance( self.expert_map, torch.Tensor): logger.info_once( diff --git a/vllm_ascend/torchair/quantization/torchair_w8a8_dynamic.py b/vllm_ascend/torchair/quantization/torchair_w8a8_dynamic.py index bc0a8d3..f639270 100644 --- a/vllm_ascend/torchair/quantization/torchair_w8a8_dynamic.py +++ b/vllm_ascend/torchair/quantization/torchair_w8a8_dynamic.py @@ -925,7 +925,7 @@ class TorchairAscendW8A8DynamicFusedMoEMethod: scoring_func: str = "softmax", e_score_correction_bias: Optional[torch.Tensor] = None, is_prefill: bool = True, - enable_force_load_balance: bool = True, + enable_force_load_balance: bool = False, log2phy: torch.Tensor = None, global_redundant_expert_num: int = 0, shared_experts: Optional[Any] = None, @@ -990,7 +990,9 @@ class TorchairAscendW8A8DynamicFusedMoEMethod: # to avoid accumulating too much tokens on a single rank. # currently it is only activated when doing profile runs. if enable_force_load_balance: - topk_ids = torch.randint_like(topk_ids, 0, global_num_experts) + topk_ids = torch.randint_like( + topk_ids, 0, + global_num_experts - global_redundant_expert_num) topk_weights = topk_weights.to(x.dtype) if fused_moe_state == FusedMoEState.AllGatherEP: