[Bugfix] [MoE] fix error in deepseek when using allgather (#3824)

### What this PR does / why we need it?
After refactoring vllm_ascend/models and FusedMoE, we are unable to pass
`gate` from deepseekv2.py to `AscendFusedMoE.forward`, which will result
in error when running deepseek v3/r1 with allgather.
Hence, this pr removes `gate` related computations from FusedMoE module
in eager/aclgraph mode.
### Does this PR introduce _any_ user-facing change?
`rm_router_logits` is deprecated in eager/aclgraph.
### How was this patch tested?
e2e & ut

- vLLM version: v0.11.0rc3
- vLLM main:
https://github.com/vllm-project/vllm/commit/releases/v0.11.1

Signed-off-by: Pr0Wh1teGivee <calvin_zhu0210@outlook.com>
This commit is contained in:
weichen
2025-10-29 14:51:39 +08:00
committed by GitHub
parent 900086fdc6
commit 0d1859af08
7 changed files with 56 additions and 85 deletions

View File

@@ -67,7 +67,7 @@ class TestMoECommMethod(TestBase):
# Verify prepare was called with correct arguments
mock_pf_instance.prepare.assert_called_once_with(
hidden_states, router_logits, False, False, None)
hidden_states, router_logits, False, False)
# Test finalize method
comm_impl.finalize(h_out,
@@ -115,7 +115,7 @@ class TestMoECommMethod(TestBase):
# Verify prepare was called with correct arguments
mock_pf_instance.prepare.assert_called_once_with(
hidden_states, router_logits, False, False, None)
hidden_states, router_logits, False, False)
# Test finalize method
comm_impl.finalize(h_out,
@@ -165,7 +165,7 @@ class TestMoECommMethod(TestBase):
# Verify prepare was called with correct arguments
mock_pf_instance.prepare.assert_called_once_with(
hidden_states, router_logits, False, False, None)
hidden_states, router_logits, False, False)
@patch("vllm_ascend.ops.fused_moe.moe_comm_method.get_current_vllm_config")
@patch("vllm_ascend.ops.fused_moe.moe_comm_method.get_forward_context")

View File

@@ -199,13 +199,8 @@ class TestPrepareAndFinalize(unittest.TestCase):
hidden_states = torch.randn(3, 8)
router_logits = torch.randn(3, 2)
# Mock the gate function for rm_router_logits=False case
mock_gate = MagicMock()
mock_gate.return_value = (router_logits.repeat(2, 1), None)
h_out, r_out, _, context_metadata = layer.prepare(hidden_states,
router_logits,
gate=mock_gate)
h_out, r_out, _, context_metadata = layer.prepare(
hidden_states, router_logits)
# After all-gather with DP=2, should double the batch size
self.assertEqual(h_out.shape[0], 12)
@@ -266,14 +261,8 @@ class TestPrepareAndFinalize(unittest.TestCase):
hidden_states = torch.randn(3, 8)
router_logits = torch.randn(3, 2)
# Mock gate for router logits recomputation
mock_gate = MagicMock()
mock_gate.return_value = (torch.randn(7, 2), None)
# Run prepare
h_out, r_out, _, _ = layer.prepare(hidden_states,
router_logits,
gate=mock_gate)
h_out, r_out, _, _ = layer.prepare(hidden_states, router_logits)
# Should be global tensor: [7, 8] and [7, 2]
self.assertEqual(h_out.shape, (7, 8))

View File

@@ -64,13 +64,12 @@ class MoECommMethod(ABC):
hidden_states: torch.Tensor,
router_logits: torch.Tensor,
enable_shared_expert_dp: bool = False,
replace_allreduce: bool = False,
gate=None
replace_allreduce: bool = False
) -> tuple[torch.Tensor, torch.Tensor, Optional[torch.Tensor],
Optional[torch.Tensor]]:
hidden_states, router_logits, mc2_mask, context_metadata = self.prepare_finalize.prepare(
hidden_states, router_logits, enable_shared_expert_dp,
replace_allreduce, gate)
replace_allreduce)
return hidden_states, router_logits, mc2_mask, context_metadata
def finalize(self,

View File

@@ -27,7 +27,7 @@ from vllm.distributed.parallel_state import (
from vllm.forward_context import get_forward_context
from vllm.model_executor.layers.fused_moe import FusedMoEConfig
from vllm_ascend.utils import enable_sp, get_rm_router_logits_state
from vllm_ascend.utils import enable_sp
class PrepareAndFinalize(ABC):
@@ -44,10 +44,6 @@ class PrepareAndFinalize(ABC):
def __init__(self, moe_config: FusedMoEConfig):
self.moe_config = moe_config
is_deepseek_v3_r1 = self.moe_config.original_num_experts == 256
self.rm_router_logits = get_rm_router_logits_state(
self.moe_config.ep_size, self.moe_config.dp_size,
is_deepseek_v3_r1)
@abstractmethod
def prepare(
@@ -55,8 +51,7 @@ class PrepareAndFinalize(ABC):
hidden_states: torch.Tensor,
router_logits: torch.Tensor,
enable_shared_expert_dp: bool = False,
replace_allreduce: bool = False,
gate=None
replace_allreduce: bool = False
) -> tuple[torch.Tensor, torch.Tensor, Optional[torch.Tensor],
Optional[torch.Tensor]]:
"""
@@ -64,14 +59,12 @@ class PrepareAndFinalize(ABC):
- Padding to align communication boundaries
- Slicing across tensor-parallel ranks
- Broadcasting across data-parallel ranks
- Recomputing router logits if needed
Args:
hidden_states (torch.Tensor): Input features, shape [num_tokens, hidden_size]
router_logits (torch.Tensor): Router outputs, shape [num_tokens, num_experts]
enable_shared_expert_dp (bool): Skip DP communication for shared experts
replace_allreduce (bool): Bypass default all-reduce behavior
gate (nn.Module, optional): Gate network to recompute router_logits if needed
Returns:
Tuple of:
@@ -124,8 +117,7 @@ class PrepareAndFinalizeWithAll2All(PrepareAndFinalize):
hidden_states: torch.Tensor,
router_logits: torch.Tensor,
enable_shared_expert_dp: bool = False,
replace_allreduce: bool = False,
gate=None
replace_allreduce: bool = False
) -> tuple[torch.Tensor, torch.Tensor, Optional[torch.Tensor],
Optional[torch.Tensor]]:
"""
@@ -221,8 +213,7 @@ class PrepareAndFinalizeWithMC2(PrepareAndFinalizeWithAll2All):
hidden_states: torch.Tensor,
router_logits: torch.Tensor,
enable_shared_expert_dp: bool = False,
replace_allreduce: bool = False,
gate=None
replace_allreduce: bool = False
) -> tuple[torch.Tensor, torch.Tensor, Optional[torch.Tensor],
Optional[torch.Tensor]]:
"""
@@ -303,7 +294,6 @@ class PrepareAndFinalizeWithAllGather(PrepareAndFinalize):
router_logits: torch.Tensor,
enable_shared_expert_dp: bool = False,
replace_allreduce: bool = False,
gate=None
) -> tuple[torch.Tensor, torch.Tensor, Optional[torch.Tensor],
Optional[torch.Tensor]]:
"""
@@ -318,7 +308,7 @@ class PrepareAndFinalizeWithAllGather(PrepareAndFinalize):
return self._prepare_with_dp_group(hidden_states, router_logits,
enable_shared_expert_dp,
replace_allreduce, gate)
replace_allreduce)
def _prepare_with_ep_group(
self,
@@ -339,7 +329,6 @@ class PrepareAndFinalizeWithAllGather(PrepareAndFinalize):
router_logits: torch.Tensor,
enable_shared_expert_dp: bool = False,
replace_allreduce: bool = False,
gate=None
) -> tuple[torch.Tensor, torch.Tensor, Optional[torch.Tensor],
Optional[torch.Tensor]]:
"""
@@ -361,18 +350,14 @@ class PrepareAndFinalizeWithAllGather(PrepareAndFinalize):
if pad_size > 0:
hidden_states = nn.functional.pad(hidden_states,
(0, 0, 0, pad_size))
if not self.rm_router_logits:
router_logits = nn.functional.pad(router_logits,
(0, 0, 0, pad_size))
router_logits = nn.functional.pad(router_logits,
(0, 0, 0, pad_size))
# All-gather across DP group
hidden_states = self.moe_config.dp_group.all_gather(
hidden_states, 0)
if self.rm_router_logits:
router_logits, _ = gate(hidden_states) # Recompute globally
else:
router_logits = self.moe_config.dp_group.all_gather(
router_logits, 0)
router_logits = self.moe_config.dp_group.all_gather(
router_logits, 0)
return hidden_states, router_logits, None, None
def finalize(self,
@@ -474,8 +459,7 @@ class PrepareAndFinalizeWithNaiveMulticast(PrepareAndFinalize):
hidden_states: torch.Tensor,
router_logits: torch.Tensor,
enable_shared_expert_dp: bool = False,
replace_allreduce: bool = False,
gate=None
replace_allreduce: bool = False
) -> tuple[torch.Tensor, torch.Tensor, Optional[torch.Tensor],
Optional[torch.Tensor]]:
"""
@@ -493,11 +477,8 @@ class PrepareAndFinalizeWithNaiveMulticast(PrepareAndFinalize):
).dp_metadata.cu_tokens_across_sp(1)
hidden_states = self._naive_multicast(hidden_states,
self.cu_tokens_across_dp_cpu)
if self.rm_router_logits:
router_logits, _ = gate(hidden_states)
else:
router_logits = self._naive_multicast(
router_logits, self.cu_tokens_across_dp_cpu)
router_logits = self._naive_multicast(router_logits,
self.cu_tokens_across_dp_cpu)
return hidden_states, router_logits, None, None

View File

@@ -48,12 +48,12 @@ from vllm_ascend.eplb.core.eplb_utils import (determine_default_expert_map,
from vllm_ascend.ops.expert_load_balancer import ExpertLoadBalancer
from vllm_ascend.quantization.quant_config import AscendFusedMoEMethod
from vllm_ascend.torchair.ops.sequence_parallel import MetadataForPadding
from vllm_ascend.torchair.utils import (npu_stream_switch, npu_wait_tensor,
from vllm_ascend.torchair.utils import (get_all_reduce_merge_state,
get_rm_router_logits_state,
npu_stream_switch, npu_wait_tensor,
super_kernel)
from vllm_ascend.utils import (AscendSocVersion, dispose_tensor,
get_all_reduce_merge_state,
get_ascend_soc_version,
get_rm_router_logits_state, is_310p,
get_ascend_soc_version, is_310p,
is_hierarchical_communication_enabled,
vllm_version_is)

View File

@@ -15,6 +15,8 @@ try:
except ImportError:
from torchair.ops import NpuStreamSwitch as _npu_stream_switch
from torchair.ops import npu_wait_tensor as _npu_wait_tensor
import vllm_ascend.envs as envs_ascend
from vllm_ascend.utils import ACL_FORMAT_FRACTAL_NZ, is_enable_nz
KV_CACHE_BYTES_CACHE_PATH_NAME = ".kv_cache_bytes"
@@ -238,3 +240,33 @@ def torchair_ops_patch():
def super_kernel(prefix: str, option: str, enabled: bool = True):
return _super_kernel(prefix, option) if enabled else nullcontext()
# TODO(ttanzhiqiang): rm_router_logits
# dp>1 will trigger
# In theory, this solution is only applicable to AllGather and AllGatherEP, because in the dp scenario, the previous operation was gate + two communications, and now it is changed to one communication + gate operation, which can save some communication time. In theory, all moe AllGather and AllGatherEP solutions can follow this logic, but now other moe models (qwen3-235b) dp solutions are not adjusted, so use the switch to control it to prevent code errors.
def get_rm_router_logits_state(ep_size: int, dp_size: int,
is_deepseek_v3_r1: bool):
# the fusion operator torch_npu.npu_grouped_matmul_finalize_routing called by allgather ep
# only supports deepseek v3/r1
if dp_size > 1:
if (envs_ascend.VLLM_ENABLE_FUSED_EXPERTS_ALLGATHER_EP and ep_size > 1
and is_deepseek_v3_r1):
return True
elif ep_size == 1 and is_deepseek_v3_r1:
return True
return False
# TODO(ttanzhiqiang): all_reduce merge
# When all_reduce_merge is in progress, shared_experts does not do all_reduce in mlp, but waits until shared_experts+router_experts are completed before doing all_reduce
# Currently, all_reduce_merge is enabled by default in the AllGather, AllGatherEP and NaiveMulticast scenarios of the deepseek model.
def get_all_reduce_merge_state(ep_size: int, is_deepseek_v3_r1: bool):
# the fusion operator torch_npu.npu_grouped_matmul_finalize_routing called by allgather ep
# only supports deepseek v3/r1
if (envs_ascend.VLLM_ENABLE_FUSED_EXPERTS_ALLGATHER_EP and ep_size > 1
and is_deepseek_v3_r1):
return True
elif ep_size == 1 and is_deepseek_v3_r1:
return True
return False

View File

@@ -535,36 +535,6 @@ class ProfileExecuteDuration:
return durations
# TODO(ttanzhiqiang): rm_router_logits
# dp>1 will trigger
# In theory, this solution is only applicable to AllGather and AllGatherEP, because in the dp scenario, the previous operation was gate + two communications, and now it is changed to one communication + gate operation, which can save some communication time. In theory, all moe AllGather and AllGatherEP solutions can follow this logic, but now other moe models (qwen3-235b) dp solutions are not adjusted, so use the switch to control it to prevent code errors.
def get_rm_router_logits_state(ep_size: int, dp_size: int,
is_deepseek_v3_r1: bool):
# the fusion operator torch_npu.npu_grouped_matmul_finalize_routing called by allgather ep
# only supports deepseek v3/r1
if dp_size > 1:
if (envs_ascend.VLLM_ENABLE_FUSED_EXPERTS_ALLGATHER_EP and ep_size > 1
and is_deepseek_v3_r1):
return True
elif ep_size == 1 and is_deepseek_v3_r1:
return True
return False
# TODO(ttanzhiqiang): all_reduce merge
# When all_reduce_merge is in progress, shared_experts does not do all_reduce in mlp, but waits until shared_experts+router_experts are completed before doing all_reduce
# Currently, all_reduce_merge is enabled by default in the AllGather, AllGatherEP and NaiveMulticast scenarios of the deepseek model.
def get_all_reduce_merge_state(ep_size: int, is_deepseek_v3_r1: bool):
# the fusion operator torch_npu.npu_grouped_matmul_finalize_routing called by allgather ep
# only supports deepseek v3/r1
if (envs_ascend.VLLM_ENABLE_FUSED_EXPERTS_ALLGATHER_EP and ep_size > 1
and is_deepseek_v3_r1):
return True
elif ep_size == 1 and is_deepseek_v3_r1:
return True
return False
def register_ascend_customop(vllm_config: Optional[VllmConfig] = None):
"""Register Ascend CustomOP