diff --git a/vllm_ascend/eplb/eplb_updator.py b/vllm_ascend/eplb/eplb_updator.py index f5072826..89721702 100644 --- a/vllm_ascend/eplb/eplb_updator.py +++ b/vllm_ascend/eplb/eplb_updator.py @@ -23,8 +23,6 @@ from vllm.logger import logger from vllm_ascend.eplb.core.eplb_utils import EPLBParamUtils from vllm_ascend.eplb.core.eplb_worker import EplbProcess -from vllm_ascend.eplb.utils import moe_load_async_stream -from vllm_ascend.utils import npu_stream_switch class EplbUpdator: @@ -155,22 +153,21 @@ class EplbUpdator: self._gather_buffer = None if dist.is_initialized(): - with npu_stream_switch(moe_load_async_stream()): - self.world_size = dist.get_world_size() - self.device = local_load.device - if self._gather_buffer is None: - shape = (self.world_size, *local_load.shape) - self._gather_buffer = torch.empty(shape, - dtype=local_load.dtype, - device=self.device) + self.world_size = dist.get_world_size() + self.device = local_load.device + if self._gather_buffer is None: + shape = (self.world_size, *local_load.shape) + self._gather_buffer = torch.empty(shape, + dtype=local_load.dtype, + device=self.device) - dist.all_gather_into_tensor(self._gather_buffer, local_load) + dist.all_gather_into_tensor(self._gather_buffer, local_load) - moe_load = self._gather_buffer.permute(1, 0, 2) - self.shared_dict["moe_load"] = moe_load.cpu() - logger.debug( - f"[ModelRunner] Updated shared_dict['moe_load'] shape={moe_load.shape}" - ) + moe_load = self._gather_buffer.permute(1, 0, 2) + self.shared_dict["moe_load"] = moe_load.cpu() + logger.debug( + f"[ModelRunner] Updated shared_dict['moe_load'] shape={moe_load.shape}" + ) else: moe_load = local_load.unsqueeze(1) self.shared_dict["moe_load"] = moe_load.cpu() diff --git a/vllm_ascend/eplb/utils.py b/vllm_ascend/eplb/utils.py index 6f703f10..0efa623b 100644 --- a/vllm_ascend/eplb/utils.py +++ b/vllm_ascend/eplb/utils.py @@ -18,9 +18,6 @@ import types import torch -import torch_npu - -_MOE_LOAD_ASYNC_STREAM = None def get_expert_map(self, layer_id): @@ -78,12 +75,3 @@ def model_register(model, model_config): model.num_moe_layers = config.num_hidden_layers - model.num_dense_layers else: raise NotImplementedError("EPLB is not supported.") - - -def moe_load_async_stream() -> torch_npu.npu.Stream: - global _MOE_LOAD_ASYNC_STREAM - if _MOE_LOAD_ASYNC_STREAM is None: - # when this function is called before any stream is set, - # we return the default stream. - _MOE_LOAD_ASYNC_STREAM = torch_npu.npu.Stream() - return _MOE_LOAD_ASYNC_STREAM \ No newline at end of file diff --git a/vllm_ascend/ops/fused_moe/fused_moe.py b/vllm_ascend/ops/fused_moe/fused_moe.py index f9534515..d2a55e31 100644 --- a/vllm_ascend/ops/fused_moe/fused_moe.py +++ b/vllm_ascend/ops/fused_moe/fused_moe.py @@ -32,7 +32,6 @@ from vllm_ascend.ascend_config import get_ascend_config from vllm_ascend.ascend_forward_context import MoECommType from vllm_ascend.distributed.parallel_state import get_mc2_group from vllm_ascend.eplb.core.eplb_utils import init_eplb_config -from vllm_ascend.eplb.utils import moe_load_async_stream from vllm_ascend.flash_common3_context import (get_flash_common3_context, set_flash_common3_context) from vllm_ascend.ops.fused_moe.experts_selector import (select_experts, @@ -371,13 +370,8 @@ class AscendFusedMoE(FusedMoE): group_list_type = fused_experts_results.group_list_type assert expert_tokens is not None and group_list_type is not None, \ "expert_tokens and group_list_type should not be None when dynamic_eplb is enabled." - moe_load_stream = moe_load_async_stream() - cur_stream = torch.npu.current_stream() - moe_load_stream.wait_stream(cur_stream) - with npu_stream_switch(moe_load_stream): - self.moe_load += expert_tokens if group_list_type == 1 else \ - torch.cat([expert_tokens[:1], expert_tokens[1:] - expert_tokens[:-1]]) - cur_stream.wait_stream(moe_load_stream) + self.moe_load += expert_tokens if group_list_type == 1 else \ + torch.cat([expert_tokens[:1], expert_tokens[1:] - expert_tokens[:-1]]) routed_out = forward_context.moe_comm_method.finalize( hidden_states=fused_experts_results.routed_out,