Files
xc-llm-ascend/vllm_ascend/eplb/eplb_updator.py
LI SHENGYONG ecf2fa482e [EPLB][Bugfix] Get expert map from layers (#5817)
### What this PR does / why we need it?
The initialization method of expert_map used by the eplb module is
different from that used by the fused_moe module. This PR deletes the
expert_map initialization method used by the eplb module to make the
initialization methods consistent.

#### before bugfix
self._expert_map=tensor([64, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
-1, -1, -1, -1, -1, -1, 1, -1, -1, -1, -1, -1, -1, -1, -1, -1, 0, 1, 2,
3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22,
23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40,
41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58,
59, 60, 61,62, 63], device='npu:1', dtype=torch.int32)

self.shared_dict["expert_maps"][0]=tensor([-1, -1, -1, -1, -1, -1, -1,
-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
-1, -1, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17,
18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35,
36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53,
54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64]], dtype=torch.int32)

### How was this patch tested?

#### qwen3-235B-w8a8 aime
| dataset | version | metric | mode | vllm-api-general-chat |
|----- | ----- | ----- | ----- | -----|
| aime2024 | 604a78 | accuracy | gen | 86.67 |

- vLLM version: v0.13.0
- vLLM main:
2f4e6548ef

Signed-off-by: shenchuxiaofugui <1311027364@qq.com>
2026-01-14 09:16:51 +08:00

240 lines
9.0 KiB
Python

#
# Copyright (c) 2025 Huawei Technologies Co., Ltd. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# This file is a part of the vllm-ascend project.
#
# Todo: Once https://github.com/vllm-project/vllm/issues/22246 is merged in vllm. Remove this updator.
import numpy
import torch
import torch.distributed as dist
import vllm.envs as envs
from vllm.logger import logger
from vllm_ascend.eplb.core.eplb_utils import EPLBParamUtils
from vllm_ascend.eplb.core.eplb_worker import EplbProcess
class EplbUpdator:
def __init__(self, ascend_config, loader, eplb_process: EplbProcess,
process):
self.ascend_config = ascend_config
self.init_eplb(self.ascend_config.expert_map_path, process)
self.eplb_loader = loader
self.eplb_process = eplb_process
self.shared_dict = self.eplb_process.shared_dict
self.moe_imbalance_dict: dict[int, float] = {}
def set_adaptor(self, adaptor):
self.adaptor = adaptor
self.num_moe_layers = self.adaptor.num_moe_layers
self.global_expert_num = self.adaptor.global_expert_num
def init_eplb(self, expert_map_path, process):
self.rank_id = dist.get_rank()
self.num_expert_load_gather = 10
self.periodic_load_gather = True
self.num_iterations_eplb_update: torch.int64 = self.ascend_config.num_iterations_eplb_update
EPLBParamUtils.check_iterations(self.num_iterations_eplb_update)
self.expert_map_path = expert_map_path
self.expert_map_record_path = self.ascend_config.expert_map_record_path
try:
if not envs.VLLM_ALLOW_EXPERT_LOAD_COLLECTING:
self.num_expert_load_gather = self.num_iterations_eplb_update
self.periodic_load_gather = False
except Exception:
self.num_expert_load_gather = self.num_iterations_eplb_update
self.periodic_load_gather = False
self.gate_eplb = self.ascend_config.gate_eplb
self.reqs = []
self.update_info_all = []
self.cur_iterations: torch.int64 = 0
self.num_wait_worker_iterations: torch.int64 = self.ascend_config.num_wait_worker_iterations
EPLBParamUtils.check_iterations(self.num_wait_worker_iterations)
self.process = process
logger.info(
f"[ModelRunner] Launched EPLB process (pid={self.process.pid})")
def update_iteration(self):
self.cur_iterations += 1
if self.cur_iterations == (self.num_iterations_eplb_update + \
self.num_wait_worker_iterations + self.num_moe_layers):
logger.info("Finish expert parallel load balancing.")
if self.expert_map_record_path is not None:
self.adaptor._export_tensor_to_file(
self.shared_dict["expert_maps"],
self.expert_map_record_path)
self.adaptor.model.clear_all_moe_loads()
if not self.gate_eplb:
self.cur_iterations = 0
def get_update_info_flag(self):
return self.cur_iterations == (self.num_iterations_eplb_update +
self.num_wait_worker_iterations - 1)
def wakeup_eplb_worker_flag(self):
return self.cur_iterations == (self.num_iterations_eplb_update - 1)
def update_expert_weight_flag(self):
weight_update_counter = self.cur_iterations - (
self.num_iterations_eplb_update + self.num_wait_worker_iterations)
return (weight_update_counter >= 0
and weight_update_counter < self.num_moe_layers)
def wakeup_eplb_worker(self):
self.eplb_process.planner_q.put(1)
def forward_before(self):
if self.update_expert_weight_flag():
(expert_send_info, expert_recv_info, updated_expert_map,
log2phy_map, layer_id) = self.update_info_all.pop(0)
log2phy_map_this_rank = torch.from_numpy(numpy.array(log2phy_map))
self.eplb_loader.set_log2phy_map(log2phy_map_this_rank)
updated_expert_map_this_rank = torch.from_numpy(
numpy.array(updated_expert_map))
self.eplb_loader.generate_expert_d2d_transfer_task(
expert_send_info, expert_recv_info,
updated_expert_map_this_rank,
layer_id + self.adaptor.num_dense_layers)
# set asynchronous stream for d2d expert weight update
self.reqs = []
self.eplb_loader.asyn_expert_weight_transfer(self.reqs)
def take_update_info_from_eplb_process(self):
# Batch after eplb process being triggered, get update info provided by eplb process
if self.get_update_info_flag():
self.update_info_all = self.eplb_process.block_update_q.get()
def forward_end(self):
if self.wakeup_eplb_worker_flag():
self.compute_and_set_moe_load(is_clear=True)
self.wakeup_eplb_worker()
if self.update_expert_weight_flag(
) and self.expert_map_record_path is None:
self.eplb_loader.update_expert_map_and_weight(self.reqs)
self.update_iteration()
def compute_and_set_moe_load(self, is_clear=False):
local_load = self.adaptor.get_rank_expert_workload()
self._gather_buffer = None
if dist.is_initialized():
self.world_size = dist.get_world_size()
self.device = local_load.device
if self._gather_buffer is None:
shape = (self.world_size, *local_load.shape)
self._gather_buffer = torch.empty(shape,
dtype=local_load.dtype,
device=self.device)
dist.all_gather_into_tensor(self._gather_buffer, local_load)
moe_load = self._gather_buffer.permute(1, 0, 2)
self.shared_dict["moe_load"] = moe_load.cpu()
logger.debug(
f"[ModelRunner] Updated shared_dict['moe_load'] shape={moe_load.shape}"
)
else:
moe_load = local_load.unsqueeze(1)
self.shared_dict["moe_load"] = moe_load.cpu()
logger.debug(
f"[ModelRunner] Updated shared_dict['moe_load'] shape={moe_load.shape}"
)
if dist.is_initialized() and dist.get_rank() == 0:
self.compute_moe_imbalance(moe_load)
self.summarize_moe_imbalance()
return moe_load
def compute_moe_imbalance(self, moe_load: torch.Tensor):
self.moe_imbalance_dict.clear()
layer_card_load = moe_load.sum(dim=-1).cpu().float()
for layer_idx in range(layer_card_load.size(0)):
layer_load = layer_card_load[layer_idx]
mean_load = layer_load.mean().item()
max_load = layer_load.max().item()
moe_load_imbalance = max_load / (mean_load + 1e-6)
logger.debug(f"[ModelRunner][MOE_load_stats][Layer {layer_idx}] "
f"PAR={moe_load_imbalance:.4f}")
self.moe_imbalance_dict[layer_idx] = moe_load_imbalance
def summarize_moe_imbalance(self):
values = list(self.moe_imbalance_dict.values())
if not values:
logger.info("[MOE_load_stats] No data available.")
return
avg_imbalance = sum(values) / len(values)
max_imbalance = max(values)
min_imbalance = min(values)
logger.info(
f"[ModelRunner][MOE_load_stats] Peak-to-Average-Ratio: "
f"Mean={avg_imbalance:.4f}, Max={max_imbalance:.4f}, Min={min_imbalance:.4f}"
)
def warm_up_eplb(self):
self.shared_dict["expert_maps"] = self.adaptor.get_global_expert_map()
self.compute_and_set_moe_load()
src_tensor = torch.empty((1, ), device=self.device)
self_rank = dist.get_rank()
comm_op_list = []
for dst_rank in range(self.world_size):
if dst_rank == self_rank:
continue
comm_op_list.append(dist.P2POp(dist.isend, src_tensor, dst_rank))
for src_rank in range(self.world_size):
if src_rank == self_rank:
continue
comm_op_list.append(dist.P2POp(dist.irecv, src_tensor, src_rank))
if comm_op_list:
reqs = dist.batch_isend_irecv(comm_op_list)
for req in reqs:
req.wait()
def shutdown(self):
"""
Clean up the EPLB process.
"""
if self.process.is_alive():
self.process.terminate()
self.process.join()
logger.info("[ModelRunner] EPLB process terminated")