【EPLB】Eplb Redundant Experts Bugfix (#4232)

### What this PR does / why we need it?
Redundant experts bugfix
The calculation logic for redundant experts has been fixed, allowing the
correct number of redundant experts to be calculated using the map.
Therefore, there is no longer a need to set the redundant expert
parameter when passing the map.

### Does this PR introduce _any_ user-facing change?
After configuring the path for experts_map, users do not need to
configure iinit_redundancy_expert.

### How was this patch tested?
The accuracy of EPLB was tested with and without the use of redundant
experts.

---------

Signed-off-by: shenchuxiaofugui <1311027364@qq.com>
This commit is contained in:
LI SHENGYONG
2025-12-03 12:00:05 +08:00
committed by GitHub
parent b6d63bbd52
commit 593a96056c
9 changed files with 45 additions and 65 deletions

View File

@@ -64,21 +64,17 @@ def test_generate_log2phy_map_multiple_rank_holding(monkeypatch):
def test_determine_default_log2phy_map_world_size_1():
log2phy = eplb_utils.determine_default_log2phy_map(
global_expert_num=3,
world_size=1,
rank_id=0,
global_redundant_expert_num=0)
log2phy = eplb_utils.determine_default_log2phy_map(global_expert_num=3,
world_size=1,
rank_id=0)
assert log2phy.shape == (3, )
assert (log2phy >= 0).all()
def test_determine_default_log2phy_map_world_size_multiple():
log2phy = eplb_utils.determine_default_log2phy_map(
global_expert_num=6,
world_size=2,
rank_id=1,
global_redundant_expert_num=1)
log2phy = eplb_utils.determine_default_log2phy_map(global_expert_num=6,
world_size=2,
rank_id=1)
assert log2phy.shape == (6, )
assert (log2phy >= 0).all()

View File

@@ -48,8 +48,7 @@ class TestExpertLoadBalancer(TestBase):
with open(json_file, 'r') as f:
self.expert_map: MockData = json.load(f)
self.expert_load_balancer = ExpertLoadBalancer(json_file,
global_expert_num=8)
self.expert_load_balancer = ExpertLoadBalancer(json_file, 8)
def test_init(self):
@@ -83,7 +82,7 @@ class TestExpertLoadBalancer(TestBase):
)
self.assertEqual(expert_placement_map.shape,
(self.expert_load_balancer.layers_num,
self.expert_load_balancer.ranks_num, 8))
self.expert_load_balancer.ranks_num, 10))
self.assertTrue(torch.all(expert_placement_map >= -1))
def test_generate_log2phy_expert_map(self):
@@ -91,7 +90,7 @@ class TestExpertLoadBalancer(TestBase):
log2phy_map = self.expert_load_balancer.generate_log2phy_expert_map(
layer_id)
self.assertEqual(log2phy_map.shape,
(self.expert_load_balancer.ranks_num, 8))
(self.expert_load_balancer.ranks_num, 10))
self.assertTrue(torch.all(log2phy_map >= -1))
@mock.patch("torch_npu.npu._lazy_init")
@@ -102,7 +101,7 @@ class TestExpertLoadBalancer(TestBase):
rank_local_expert_num, rank_expert_map = self.expert_load_balancer.get_rank_placement_map(
layer_id, rank_id)
self.assertEqual(rank_local_expert_num, 5)
expected_tensor = torch.tensor([2, -1, 1, 3, -1, 4, -1, 0],
expected_tensor = torch.tensor([2, -1, 1, 3, -1, 4, -1, 0, -1, -1],
dtype=torch.int32).to(
rank_expert_map.device)
self.assertTrue(rank_expert_map.equal(expected_tensor))
@@ -110,7 +109,7 @@ class TestExpertLoadBalancer(TestBase):
rank_id = 1
rank_local_expert_num, rank_expert_map = self.expert_load_balancer.get_rank_placement_map(
layer_id, rank_id)
expected_tensor = torch.tensor([-1, 1, 4, -1, 2, -1, 0, 3],
expected_tensor = torch.tensor([-1, 1, 4, -1, 2, -1, 0, 3, -1, -1],
dtype=torch.int32).to(
rank_expert_map.device)
self.assertTrue(rank_expert_map.equal(expected_tensor))
@@ -120,7 +119,7 @@ class TestExpertLoadBalancer(TestBase):
rank_id = 0
log2phy_map = self.expert_load_balancer.get_rank_log2phy_map(
layer_id, rank_id)
expected_tensor = torch.tensor([2, 6, 1, 3, 7, 4, 5, 0],
expected_tensor = torch.tensor([2, 6, 1, 3, 7, 4, 5, 0, -1, -1],
dtype=torch.int32).to(
log2phy_map.device)
self.assertTrue(log2phy_map.equal(expected_tensor))
@@ -128,7 +127,7 @@ class TestExpertLoadBalancer(TestBase):
rank_id = 1
log2phy_map = self.expert_load_balancer.get_rank_log2phy_map(
layer_id, rank_id)
expected_tensor = torch.tensor([2, 6, 9, 3, 7, 4, 5, 8],
expected_tensor = torch.tensor([2, 6, 9, 3, 7, 4, 5, 8, -1, -1],
dtype=torch.int32).to(
log2phy_map.device)
self.assertTrue(log2phy_map.equal(expected_tensor))

View File

@@ -88,8 +88,7 @@ def generate_log2phy_map(expert_map):
return log2phy_map
def determine_default_log2phy_map(global_expert_num, world_size, rank_id,
global_redundant_expert_num):
def determine_default_log2phy_map(global_expert_num, world_size, rank_id):
if world_size == 1:
local_ids = torch.arange(global_expert_num, dtype=torch.int32)
expert_map_all = local_ids.unsqueeze(0).expand(world_size, -1)

View File

@@ -33,8 +33,7 @@ from vllm.model_executor.layers.shared_fused_moe import SharedFusedMoE
from vllm_ascend.ascend_config import get_ascend_config
from vllm_ascend.ascend_forward_context import MoECommType
from vllm_ascend.distributed.parallel_state import get_mc2_group
from vllm_ascend.eplb.core.eplb_utils import (determine_default_expert_map,
determine_default_log2phy_map)
from vllm_ascend.eplb.core.eplb_utils import determine_default_log2phy_map
from vllm_ascend.ops.expert_load_balancer import ExpertLoadBalancer
from vllm_ascend.ops.moe.experts_selector import select_experts
from vllm_ascend.ops.moe.moe_comm_method import setup_moe_comm_method
@@ -149,10 +148,8 @@ class AscendFusedMoE(FusedMoE):
AscendFusedMoE.moe_counter += 1
self.moe_instance_id = AscendFusedMoE.moe_counter
self.global_num_experts = num_experts
self.expert_map = None
self.log2phy = None
self.global_redundant_expert_num = 0
if self.quant_config is None:
self.quant_method = AscendUnquantizedFusedMoEMethod(
@@ -181,10 +178,11 @@ class AscendFusedMoE(FusedMoE):
self.expert_map_path) and os.access(self.expert_map_path,
os.R_OK):
self.expert_load_balancer = ExpertLoadBalancer(
self.expert_map_path, self.global_num_experts)
self.expert_map_path, num_experts)
self.expert_load_balancer.check_expert_map_tensor()
self.global_redundant_expert_num = (
self.expert_load_balancer.get_global_redundant_expert_num())
self.global_num_experts = num_experts + self.global_redundant_expert_num
try:
self.local_num_experts, self.expert_map = (
self.expert_load_balancer.get_rank_placement_map(
@@ -194,41 +192,26 @@ class AscendFusedMoE(FusedMoE):
except Exception as e:
logger.warning(
f"Init expert map of mtp/eagle when using sample.{e}")
self.local_num_experts, self.expert_map = determine_default_expert_map(
self.global_num_experts, self.ep_size, self.ep_rank,
self.global_redundant_expert_num)
self.local_num_experts, self.expert_map = determine_expert_map(
self.ep_size, self.ep_rank, self.global_num_experts)
self.log2phy = determine_default_log2phy_map(
self.global_num_experts, self.ep_size, self.ep_rank,
self.global_redundant_expert_num).npu()
if self.expert_map is not None and isinstance(
self.expert_map, torch.Tensor):
logger.info_once(
"[EP Rank %s/%s] Expert parallelism is enabled. Local/global"
" number of experts: %s/%s. Experts local to global index map:"
" %s.", self.ep_rank, self.ep_size, self.local_num_experts,
self.global_num_experts,
get_compressed_expert_map(self.expert_map))
self.global_num_experts, self.ep_size, self.ep_rank).npu()
else:
# init moe.
self.local_num_experts, self.expert_map = determine_expert_map(
self.ep_size, self.ep_rank, self.global_num_experts)
# dynamic eplb initializing with not expert_map_path
if self.dynamic_eplb:
self.global_redundant_expert_num = ascend_config.init_redundancy_expert
self.local_num_experts, self.expert_map = determine_default_expert_map(
self.global_num_experts, self.ep_size, self.ep_rank,
self.global_redundant_expert_num)
self.log2phy = determine_default_log2phy_map(
self.global_num_experts, self.ep_size, self.ep_rank,
self.global_redundant_expert_num).npu()
if self.expert_map is not None and isinstance(
self.expert_map, torch.Tensor):
logger.info_once(
"[EP Rank %s/%s] Expert parallelism is enabled. Local/global"
" number of experts: %s/%s. Experts local to global index map:"
" %s.", self.ep_rank, self.ep_size, self.local_num_experts,
self.global_num_experts,
get_compressed_expert_map(self.expert_map))
self.global_num_experts, self.ep_size, self.ep_rank).npu()
if self.expert_map is not None and isinstance(self.expert_map,
torch.Tensor):
logger.info_once(
"[EP Rank %s/%s] Expert parallelism is enabled. Local/global"
" number of experts: %s/%s. Experts local to global index map:"
" %s.", self.ep_rank, self.ep_size, self.local_num_experts,
self.global_num_experts,
get_compressed_expert_map(self.expert_map))
local_num_experts = (torch.sum(
self.expert_map != -1) if self.expert_map is not None else
self.global_num_experts)
@@ -302,7 +285,6 @@ class AscendFusedMoE(FusedMoE):
# This approach may overlook some extreme scenarios.
enable_force_load_balance = forward_context.in_profile_run
forward_context = get_forward_context()
hidden_states, router_logits = forward_context.moe_comm_method.prepare(
hidden_states=hidden_states,
router_logits=router_logits,

View File

@@ -8,12 +8,14 @@ import torch.distributed as dist
class ExpertLoadBalancer(object):
def __init__(self, expert_map_path, global_expert_num):
def __init__(self, expert_map_path, num_experts):
self.expert_map_path = expert_map_path
self.global_expert_num = global_expert_num
self.num_experts = num_experts
self.tensor_data = []
self.expert_map_tensor, self.layers_num, self.ranks_num = (
self._expert_file_to_tensor())
self.global_expert_num = num_experts + self.get_global_redundant_expert_num(
)
self.expert_placement_map = self.generate_expert_placement_map()
def _expert_file_to_tensor(self):
@@ -96,7 +98,7 @@ class ExpertLoadBalancer(object):
def get_global_redundant_expert_num(self):
global_redundant_expert_num = (
len(self.expert_map_tensor[0][0]) * self.ranks_num -
self.global_expert_num)
self.num_experts)
return global_redundant_expert_num
def check_expert_map_tensor(self):

View File

@@ -371,7 +371,8 @@ class AscendW4A8DynamicFusedMoEMethod:
# to avoid accumulating too much tokens on a single rank.
# currently it is only activated when doing profile runs.
if enable_force_load_balance:
topk_ids = torch.randint_like(topk_ids, 0, global_num_experts)
topk_ids = torch.randint_like(
topk_ids, 0, global_num_experts - global_redundant_expert_num)
topk_weights = topk_weights.to(x.dtype)

View File

@@ -197,7 +197,7 @@ class AscendW8A8DynamicFusedMoEMethod:
scoring_func: str = "softmax",
e_score_correction_bias: Optional[torch.Tensor] = None,
is_prefill: bool = True,
enable_force_load_balance: bool = True,
enable_force_load_balance: bool = False,
log2phy: torch.Tensor = None,
global_redundant_expert_num: int = 0,
shared_experts: Optional[Any] = None,
@@ -225,7 +225,8 @@ class AscendW8A8DynamicFusedMoEMethod:
# to avoid accumulating too much tokens on a single rank.
# currently it is only activated when doing profile runs.
if enable_force_load_balance:
topk_ids = torch.randint_like(topk_ids, 0, global_num_experts)
topk_ids = torch.randint_like(
topk_ids, 0, global_num_experts - global_redundant_expert_num)
if self.use_aclgraph:
moe_comm_method = get_forward_context().moe_comm_method

View File

@@ -1058,8 +1058,7 @@ class TorchairAscendFusedMoE(FusedMoE):
self.global_num_experts, self.ep_size, self.ep_rank,
self.global_redundant_expert_num)
self.log2phy = determine_default_log2phy_map(
self.global_num_experts, self.ep_size, self.ep_rank,
self.global_redundant_expert_num).npu()
self.global_num_experts, self.ep_size, self.ep_rank).npu()
if self.expert_map is not None and isinstance(
self.expert_map, torch.Tensor):
logger.info_once(
@@ -1079,8 +1078,7 @@ class TorchairAscendFusedMoE(FusedMoE):
self.global_num_experts, self.ep_size, self.ep_rank,
self.global_redundant_expert_num)
self.log2phy = determine_default_log2phy_map(
self.global_num_experts, self.ep_size, self.ep_rank,
self.global_redundant_expert_num).npu()
self.global_num_experts, self.ep_size, self.ep_rank).npu()
if self.expert_map is not None and isinstance(
self.expert_map, torch.Tensor):
logger.info_once(

View File

@@ -925,7 +925,7 @@ class TorchairAscendW8A8DynamicFusedMoEMethod:
scoring_func: str = "softmax",
e_score_correction_bias: Optional[torch.Tensor] = None,
is_prefill: bool = True,
enable_force_load_balance: bool = True,
enable_force_load_balance: bool = False,
log2phy: torch.Tensor = None,
global_redundant_expert_num: int = 0,
shared_experts: Optional[Any] = None,
@@ -990,7 +990,9 @@ class TorchairAscendW8A8DynamicFusedMoEMethod:
# to avoid accumulating too much tokens on a single rank.
# currently it is only activated when doing profile runs.
if enable_force_load_balance:
topk_ids = torch.randint_like(topk_ids, 0, global_num_experts)
topk_ids = torch.randint_like(
topk_ids, 0,
global_num_experts - global_redundant_expert_num)
topk_weights = topk_weights.to(x.dtype)
if fused_moe_state == FusedMoEState.AllGatherEP: