[Bugfix] Revert pr4214 multi-stream collect expert hotpot (#5529)

### What this PR does / why we need it?
PR4214 was intended to collect expert heat by processing multiple
streams, which could lead to memory overwriting and accuracy issues.
After communicating with the PR submitter, this PR has been reverted.

### Does this PR introduce _any_ user-facing change?

### How was this patch tested?
qwen3-moe dynamic eplb
Befor revert
| dataset | version | metric | mode | vllm-api-general-chat |
|----- | ----- | ----- | ----- | -----|
| aime2024 | 604a78 | accuracy | gen | 43.33 |

After revert 
| dataset | version | metric | mode | vllm-api-general-chat |
|----- | ----- | ----- | ----- | -----|
| aime2024 | 604a78 | accuracy | gen | 86.67 |

baseline (without eplb)
| dataset | version | metric | mode | vllm-api-general-chat |
|----- | ----- | ----- | ----- | -----|
| aime2024 | 604a78 | accuracy | gen | 86.67 |
- vLLM version: v0.13.0
- vLLM main:
45c1ca1ca1

Signed-off-by: shenchuxiaofugui <1311027364@qq.com>
This commit is contained in:
LI SHENGYONG
2026-01-07 11:26:47 +08:00
committed by GitHub
parent 25baf6df09
commit cd59323e40
3 changed files with 15 additions and 36 deletions

View File

@@ -23,8 +23,6 @@ from vllm.logger import logger
from vllm_ascend.eplb.core.eplb_utils import EPLBParamUtils
from vllm_ascend.eplb.core.eplb_worker import EplbProcess
from vllm_ascend.eplb.utils import moe_load_async_stream
from vllm_ascend.utils import npu_stream_switch
class EplbUpdator:
@@ -155,22 +153,21 @@ class EplbUpdator:
self._gather_buffer = None
if dist.is_initialized():
with npu_stream_switch(moe_load_async_stream()):
self.world_size = dist.get_world_size()
self.device = local_load.device
if self._gather_buffer is None:
shape = (self.world_size, *local_load.shape)
self._gather_buffer = torch.empty(shape,
dtype=local_load.dtype,
device=self.device)
self.world_size = dist.get_world_size()
self.device = local_load.device
if self._gather_buffer is None:
shape = (self.world_size, *local_load.shape)
self._gather_buffer = torch.empty(shape,
dtype=local_load.dtype,
device=self.device)
dist.all_gather_into_tensor(self._gather_buffer, local_load)
dist.all_gather_into_tensor(self._gather_buffer, local_load)
moe_load = self._gather_buffer.permute(1, 0, 2)
self.shared_dict["moe_load"] = moe_load.cpu()
logger.debug(
f"[ModelRunner] Updated shared_dict['moe_load'] shape={moe_load.shape}"
)
moe_load = self._gather_buffer.permute(1, 0, 2)
self.shared_dict["moe_load"] = moe_load.cpu()
logger.debug(
f"[ModelRunner] Updated shared_dict['moe_load'] shape={moe_load.shape}"
)
else:
moe_load = local_load.unsqueeze(1)
self.shared_dict["moe_load"] = moe_load.cpu()

View File

@@ -18,9 +18,6 @@
import types
import torch
import torch_npu
_MOE_LOAD_ASYNC_STREAM = None
def get_expert_map(self, layer_id):
@@ -78,12 +75,3 @@ def model_register(model, model_config):
model.num_moe_layers = config.num_hidden_layers - model.num_dense_layers
else:
raise NotImplementedError("EPLB is not supported.")
def moe_load_async_stream() -> torch_npu.npu.Stream:
global _MOE_LOAD_ASYNC_STREAM
if _MOE_LOAD_ASYNC_STREAM is None:
# when this function is called before any stream is set,
# we return the default stream.
_MOE_LOAD_ASYNC_STREAM = torch_npu.npu.Stream()
return _MOE_LOAD_ASYNC_STREAM

View File

@@ -32,7 +32,6 @@ from vllm_ascend.ascend_config import get_ascend_config
from vllm_ascend.ascend_forward_context import MoECommType
from vllm_ascend.distributed.parallel_state import get_mc2_group
from vllm_ascend.eplb.core.eplb_utils import init_eplb_config
from vllm_ascend.eplb.utils import moe_load_async_stream
from vllm_ascend.flash_common3_context import (get_flash_common3_context,
set_flash_common3_context)
from vllm_ascend.ops.fused_moe.experts_selector import (select_experts,
@@ -371,13 +370,8 @@ class AscendFusedMoE(FusedMoE):
group_list_type = fused_experts_results.group_list_type
assert expert_tokens is not None and group_list_type is not None, \
"expert_tokens and group_list_type should not be None when dynamic_eplb is enabled."
moe_load_stream = moe_load_async_stream()
cur_stream = torch.npu.current_stream()
moe_load_stream.wait_stream(cur_stream)
with npu_stream_switch(moe_load_stream):
self.moe_load += expert_tokens if group_list_type == 1 else \
torch.cat([expert_tokens[:1], expert_tokens[1:] - expert_tokens[:-1]])
cur_stream.wait_stream(moe_load_stream)
self.moe_load += expert_tokens if group_list_type == 1 else \
torch.cat([expert_tokens[:1], expert_tokens[1:] - expert_tokens[:-1]])
routed_out = forward_context.moe_comm_method.finalize(
hidden_states=fused_experts_results.routed_out,