[EPLB] Display the expert hotness comparison before and after eplb. (#6877)

### What this PR does / why we need it?
To intuitively show the effect of the eplb algorithm, we print the
expert heat before and after eplb.

### Does this PR introduce _any_ user-facing change?

### How was this patch tested?

![Snipaste_2026-02-28_17-23-42](https://github.com/user-attachments/assets/db1dadd1-cf96-44da-af34-57d41ccf412f)


- vLLM version: v0.16.0
- vLLM main:
15d76f74e2

Signed-off-by: shenchuxiaofugui <1311027364@qq.com>
This commit is contained in:
LI SHENGYONG
2026-03-06 09:53:29 +08:00
committed by GitHub
parent 18b52afe2b
commit ccd00798f3
3 changed files with 42 additions and 38 deletions

View File

@@ -66,7 +66,7 @@ vllm serve Qwen/Qwen3-235B-A22 \
--tensor-parallel-size 16 \ --tensor-parallel-size 16 \
--enable-expert-parallel \ --enable-expert-parallel \
--additional-config '{ --additional-config '{
"expert_map_path": "/path/to/eplb.json" "eplb_config": {"expert_map_path": "/path/to/eplb.json"}
}' }'
``` ```

View File

@@ -17,6 +17,7 @@
from multiprocessing import Process, Queue from multiprocessing import Process, Queue
from typing import Any from typing import Any
import numpy as np
import torch import torch
import torch.distributed as dist import torch.distributed as dist
from vllm.logger import logger from vllm.logger import logger
@@ -60,6 +61,16 @@ class EplbWorker:
old_placement = self.global2local(self.old_expert_maps, self.num_local_experts) old_placement = self.global2local(self.old_expert_maps, self.num_local_experts)
_, _, new_placement = self.calculate_rebalance_experts(load_info, old_placement) _, _, new_placement = self.calculate_rebalance_experts(load_info, old_placement)
if self.rank_id == 0:
hotness = self._calculate_hotness(old_placement, load_info)
current_mean, current_max = self._compute_imbalance(old_placement, hotness)
update_mean, update_max = self._compute_imbalance(new_placement, hotness)
logger.info(
"[Expert Hotness] Current: mean={:.3f}, max={:.3f}, Updated: mean={:.3f}, max={:.3f}".format(
current_mean, current_max, update_mean, update_max
)
)
if not torch.is_tensor(new_placement): if not torch.is_tensor(new_placement):
new_placement = torch.tensor(new_placement) new_placement = torch.tensor(new_placement)
self.check_expert_placement(old_placement, new_placement) self.check_expert_placement(old_placement, new_placement)
@@ -251,6 +262,36 @@ class EplbWorker:
return list(zip(send_all, recv_all, maps, log2phy_all, layer_ids)) return list(zip(send_all, recv_all, maps, log2phy_all, layer_ids))
@staticmethod
def _compute_imbalance(deployment_all_layer, hotness_all_layer: np.ndarray):
imbalance_list = []
deployment_all_layer = np.array(deployment_all_layer)
for deployment, hotness in zip(deployment_all_layer, hotness_all_layer):
counts = np.bincount(deployment.reshape(-1), minlength=hotness.shape[0])
unit_hotness = np.divide(hotness, counts, out=np.zeros_like(hotness, dtype=float), where=counts != 0)
stage_load = unit_hotness[deployment].sum(-1)
stage_par = stage_load.max() / stage_load.mean()
imbalance_list.append(stage_par)
max_val = max(imbalance_list)
mean_val = sum(imbalance_list) / len(imbalance_list)
return mean_val, max_val
@staticmethod
def _calculate_hotness(deployment_all_layer, moe_load_all_layer):
hotnesses = []
num_of_expert = deployment_all_layer.shape[1] * deployment_all_layer.shape[2]
for deployment, rank_load in zip(deployment_all_layer, moe_load_all_layer.numpy()):
hotness = np.zeros(num_of_expert, dtype=rank_load.dtype)
deployment_flat = deployment.ravel()
rank_load_flat = rank_load.ravel()
np.add.at(hotness, deployment_flat, rank_load_flat)
hotnesses.append(hotness)
return np.array(hotnesses)
class EplbProcess: class EplbProcess:
def __init__(self, shared_dict, policy_type: int = 0, enable_d2d: bool = True): def __init__(self, shared_dict, policy_type: int = 0, enable_d2d: bool = True):

View File

@@ -34,7 +34,6 @@ class EplbUpdator:
self.eplb_loader = loader self.eplb_loader = loader
self.eplb_process = eplb_process self.eplb_process = eplb_process
self.shared_dict = self.eplb_process.shared_dict self.shared_dict = self.eplb_process.shared_dict
self.moe_imbalance_dict: dict[int, float] = {}
self.comm_group = get_dynamic_eplb_group() self.comm_group = get_dynamic_eplb_group()
def set_adaptor(self, adaptor: VllmEplbAdaptor): def set_adaptor(self, adaptor: VllmEplbAdaptor):
@@ -137,44 +136,8 @@ class EplbUpdator:
self.shared_dict["moe_load"] = moe_load.cpu() self.shared_dict["moe_load"] = moe_load.cpu()
logger.debug(f"[ModelRunner] Updated shared_dict['moe_load'] shape={moe_load.shape}") logger.debug(f"[ModelRunner] Updated shared_dict['moe_load'] shape={moe_load.shape}")
if dist.get_rank() == 0:
self.compute_moe_imbalance(moe_load)
self.summarize_moe_imbalance()
return moe_load return moe_load
def compute_moe_imbalance(self, moe_load: torch.Tensor):
self.moe_imbalance_dict.clear()
layer_card_load = moe_load.sum(dim=-1).cpu().float()
for layer_idx in range(layer_card_load.size(0)):
layer_load = layer_card_load[layer_idx]
mean_load = layer_load.mean().item()
max_load = layer_load.max().item()
moe_load_imbalance = max_load / (mean_load + 1e-6)
logger.debug(f"[ModelRunner][MOE_load_stats][Layer {layer_idx}] PAR={moe_load_imbalance:.4f}")
self.moe_imbalance_dict[layer_idx] = moe_load_imbalance
def summarize_moe_imbalance(self):
values = list(self.moe_imbalance_dict.values())
if not values:
logger.info("[MOE_load_stats] No data available.")
return
avg_imbalance = sum(values) / len(values)
max_imbalance = max(values)
min_imbalance = min(values)
logger.info(
f"[ModelRunner][MOE_load_stats] Peak-to-Average-Ratio: "
f"Mean={avg_imbalance:.4f}, Max={max_imbalance:.4f}, Min={min_imbalance:.4f}"
)
def warm_up_eplb(self): def warm_up_eplb(self):
self.shared_dict["expert_maps"] = self.adaptor.get_global_expert_map() self.shared_dict["expert_maps"] = self.adaptor.get_global_expert_map()
self.compute_and_set_moe_load() self.compute_and_set_moe_load()