diff --git a/tests/ut/ops/test_prepare_finalize.py b/tests/ut/ops/test_prepare_finalize.py index f0480e1c..35cb01a7 100644 --- a/tests/ut/ops/test_prepare_finalize.py +++ b/tests/ut/ops/test_prepare_finalize.py @@ -6,7 +6,7 @@ from vllm.model_executor.layers.fused_moe import FusedMoEConfig from vllm_ascend.ops.fused_moe.prepare_finalize import ( PrepareAndFinalizeWithAll2All, PrepareAndFinalizeWithAllGather, - PrepareAndFinalizeWithMC2, PrepareAndFinalizeWithNaiveMulticast) + PrepareAndFinalizeWithMC2) class TestPrepareAndFinalize(unittest.TestCase): @@ -222,59 +222,3 @@ class TestPrepareAndFinalize(unittest.TestCase): mock_tp_all_reduce.return_value = result result_with_tp = layer.finalize(h_out, reduce_results=True) self.assertEqual(result_with_tp.shape[0], 3) - - @patch("vllm_ascend.ops.fused_moe.prepare_finalize.get_dp_group") - @patch( - "vllm_ascend.ops.fused_moe.prepare_finalize.tensor_model_parallel_all_reduce" - ) - @patch("vllm_ascend.ops.fused_moe.prepare_finalize.get_forward_context") - def test_naive_multicast_prepare_finalize(self, mock_get_forward_context, - mock_tp_all_reduce, - mock_get_dp_group): - # Mock forward context with DP metadata - mock_context = MagicMock() - mock_context.dp_metadata.cu_tokens_across_sp.return_value = torch.tensor( - [2, 5, 7]) - mock_get_forward_context.return_value = mock_context - - # Setup DP group mock - mock_dp_group = MagicMock() - mock_dp_group.broadcast = MagicMock() - mock_dp_group.all_reduce = MagicMock() - mock_get_dp_group.return_value = mock_dp_group - - # Mock all_reduce to just return input (simulate sum) - def mock_all_reduce(tensor): - return tensor * 2 - - mock_dp_group.all_reduce.side_effect = mock_all_reduce - - # Setup config - self.moe_config.dp_size = 3 - self.moe_config.dp_rank = 1 - self.moe_config.tp_size = 1 - self.moe_config.ep_size = 1 - - layer = PrepareAndFinalizeWithNaiveMulticast(self.moe_config) - - # Local inputs - hidden_states = torch.randn(3, 8) - router_logits = torch.randn(3, 2) - - # Run prepare - h_out, r_out, _, _ = layer.prepare(hidden_states, router_logits) - - # Should be global tensor: [7, 8] and [7, 2] - self.assertEqual(h_out.shape, (7, 8)) - self.assertEqual(r_out.shape, (7, 2)) - - # Run finalize - result = layer.finalize(h_out, reduce_results=False) - - # Should slice back to local: [3, 8] - self.assertEqual(result.shape, (3, 8)) - - # Test with reduce_results=True and TP/EP > 1 - mock_tp_all_reduce.return_value = result - result_with_tp = layer.finalize(h_out, reduce_results=True) - self.assertEqual(result_with_tp.shape, (3, 8)) diff --git a/tests/ut/torchair/ops/test_torchair_fused_moe.py b/tests/ut/torchair/ops/test_torchair_fused_moe.py index 57569a28..cf306d2a 100644 --- a/tests/ut/torchair/ops/test_torchair_fused_moe.py +++ b/tests/ut/torchair/ops/test_torchair_fused_moe.py @@ -23,7 +23,7 @@ from pytest_mock import MockerFixture from vllm.model_executor.layers.fused_moe import FusedMoEMethodBase import vllm_ascend -from vllm_ascend.ascend_forward_context import _get_fused_moe_state +from vllm_ascend.ascend_forward_context import get_fused_moe_state from vllm_ascend.quantization.quant_config import AscendFusedMoEMethod from vllm_ascend.torchair.ops.torchair_fused_moe import ( TorchairAscendFusedMoE, TorchairAscendUnquantizedFusedMoEMethod) @@ -360,7 +360,7 @@ class TestTorchairAscendUnquantizedFusedMoEMethod: global_redundant_expert_num = vllm_ascend.torchair.ops.torchair_fused_moe.get_ascend_config( ).init_redundancy_expert is_deepseek_v3_r1 = global_num_experts - global_redundant_expert_num == 256 - forward_context = MagicMock(fused_moe_state=_get_fused_moe_state( + forward_context = MagicMock(fused_moe_state=get_fused_moe_state( ep_size, is_prefill, is_deepseek_v3_r1)) with patch( "vllm_ascend.torchair.ops.torchair_fused_moe.get_forward_context", @@ -396,7 +396,7 @@ class TestTorchairAscendUnquantizedFusedMoEMethod: ep_size = others_param is_prefill = False forward_context = MagicMock( - fused_moe_state=_get_fused_moe_state(ep_size, is_prefill, True)) + fused_moe_state=get_fused_moe_state(ep_size, is_prefill, True)) with patch("vllm_ascend.torchair.ops.torchair_fused_moe.get_forward_context", return_value=forward_context), \ patch("vllm_ascend.torchair.ops.torchair_fused_moe.get_ascend_soc_version", return_value=AscendSocVersion.A3): expert_map = torch.tensor([0, 1, 2, -1, -1, -1, -1, -1]) diff --git a/tests/ut/worker/test_model_runner_v1.py b/tests/ut/worker/test_model_runner_v1.py index b4eec741..1d781490 100644 --- a/tests/ut/worker/test_model_runner_v1.py +++ b/tests/ut/worker/test_model_runner_v1.py @@ -74,7 +74,7 @@ def test_select_moe_comm_method(soc_version, enable_expert_parallel, # Bind the real method to the mock object method = NPUModelRunner._select_moe_comm_method( - mock_runner, num_tokens, False) + mock_runner, num_tokens) # Assert the result assert method == expected_method @@ -108,4 +108,4 @@ def test_select_moe_comm_method_unsupported_soc(): return_value=True), \ pytest.raises(ValueError, match=f"Unsupported soc_version: {unsupported_soc}"): - NPUModelRunner._select_moe_comm_method(mock_runner, 100, False) + NPUModelRunner._select_moe_comm_method(mock_runner, 100) diff --git a/vllm_ascend/ascend_forward_context.py b/vllm_ascend/ascend_forward_context.py index 8c477dac..11c1d3a0 100644 --- a/vllm_ascend/ascend_forward_context.py +++ b/vllm_ascend/ascend_forward_context.py @@ -29,16 +29,8 @@ class FusedMoEState(Enum): All2AllSeq = 5 -class MoECommType(Enum): - ALLGATHER = 0 - MC2 = 1 - ALLTOALL = 2 - NAIVE_MULTICAST = 3 - - -# TODO(zzzzwwjj): add soc_version to choose branch -def _get_fused_moe_state(ep_size: int, with_prefill: bool, - is_deepseek_v3_r1: bool): +def get_fused_moe_state(ep_size: int, with_prefill: bool, + is_deepseek_v3_r1: bool): # the fusion operator torch_npu.npu_grouped_matmul_finalize_routing called by allgather ep # only supports deepseek v3/r1 if (envs_ascend.VLLM_ENABLE_FUSED_EXPERTS_ALLGATHER_EP and ep_size > 1 @@ -56,6 +48,12 @@ def _get_fused_moe_state(ep_size: int, with_prefill: bool, return FusedMoEState.MC2 +class MoECommType(Enum): + ALLGATHER = 0 + MC2 = 1 + ALLTOALL = 2 + + @contextmanager def set_ascend_forward_context( attn_metadata: Any, @@ -103,8 +101,8 @@ def set_ascend_forward_context( is_deepseek_v3_r1 = hasattr( vllm_config.model_config.hf_config, 'n_routed_experts' ) and vllm_config.model_config.hf_config.n_routed_experts == 256 - fused_moe_state = _get_fused_moe_state(ep_size, with_prefill, - is_deepseek_v3_r1) + fused_moe_state = get_fused_moe_state(ep_size, with_prefill, + is_deepseek_v3_r1) forward_context.fused_moe_state = fused_moe_state forward_context.in_profile_run = in_profile_run diff --git a/vllm_ascend/ops/fused_moe/moe_comm_method.py b/vllm_ascend/ops/fused_moe/moe_comm_method.py index c89eb1df..c48ce1a4 100644 --- a/vllm_ascend/ops/fused_moe/moe_comm_method.py +++ b/vllm_ascend/ops/fused_moe/moe_comm_method.py @@ -27,7 +27,7 @@ from vllm_ascend.ascend_forward_context import MoECommType from vllm_ascend.ops.fused_moe.moe_mlp import unified_apply_mlp from vllm_ascend.ops.fused_moe.prepare_finalize import ( PrepareAndFinalizeWithAll2All, PrepareAndFinalizeWithAllGather, - PrepareAndFinalizeWithMC2, PrepareAndFinalizeWithNaiveMulticast, QuantType) + PrepareAndFinalizeWithMC2, QuantType) from vllm_ascend.ops.fused_moe.token_dispatcher import ( TokenDispatcherWithAll2AllV, TokenDispatcherWithAllGather, TokenDispatcherWithMC2, TokenDispatcherWithMoge) @@ -44,8 +44,6 @@ def setup_moe_comm_method(moe_config): _MoECommMethods[MoECommType.ALLTOALL] = AlltoAllCommImpl(moe_config) _MoECommMethods[MoECommType.ALLGATHER] = AllGatherCommImpl(moe_config) _MoECommMethods[MoECommType.MC2] = MC2CommImpl(moe_config) - _MoECommMethods[MoECommType.NAIVE_MULTICAST] = NaiveMulticastCommImpl( - moe_config) class MoECommMethod(ABC): @@ -245,32 +243,3 @@ class AlltoAllCommImpl(MoECommMethod): def _get_prepare_finalize(self): return PrepareAndFinalizeWithAll2All(self.moe_config) - - -class NaiveMulticastCommImpl(MoECommMethod): - """This implementation is the same as NativeAllGatherCommImpl, - but uses NPU-specific ops for better performance. - - This implementation should be compatible with all scenarios, and - thus it is the default implementation for MoE communication methods. - It uses `torch_npu.npu_moe_init_routing_v2` for pre-processing - and `torch_npu.npu_moe_token_unpermute` for post-processing - to handle the token-to-expert mapping and communication efficiently. - - NOTE(Yizhou): TBH, it is really weird that we were supposed to use - `torch_npu.npu_moe_init_routing_v2` and `torch_npu.npu_moe_finalize_routing` - or `torch_npu.npu_moe_token_permute` and `torch_npu.npu_moe_token_unpermute` - for pre-processing and post-processing, respectively. - But `npu_moe_finalize_routing` will lead to accuracy issues so we have to - use `torch_npu.npu_moe_token_unpermute` instead. - This is a workaround and should be removed after the issue is fixed. - """ - - def _get_token_dispatcher(self): - return TokenDispatcherWithAllGather( - top_k=self.moe_config.experts_per_token, - num_experts=self.moe_config.num_experts, - num_local_experts=self.moe_config.num_local_experts) - - def _get_prepare_finalize(self): - return PrepareAndFinalizeWithNaiveMulticast(self.moe_config) diff --git a/vllm_ascend/ops/fused_moe/prepare_finalize.py b/vllm_ascend/ops/fused_moe/prepare_finalize.py index 46640006..48350ea8 100644 --- a/vllm_ascend/ops/fused_moe/prepare_finalize.py +++ b/vllm_ascend/ops/fused_moe/prepare_finalize.py @@ -45,7 +45,7 @@ class PrepareAndFinalize(ABC): """ Abstract base class for MoE (Mixture-of-Experts) tensor preparation and finalization in distributed environments. Subclasses implement specific communication strategies - (e.g., AllGather, All2All, MC2, Naive Multicast) to handle tensor padding, slicing, + (e.g., AllGather, All2All, MC2) to handle tensor padding, slicing, broadcasting, and reduction across TP/DP/EP groups. Attributes: @@ -454,115 +454,3 @@ class PrepareAndFinalizeWithAllGather(PrepareAndFinalize): hidden_states = tensor_model_parallel_all_reduce(hidden_states) return hidden_states - - -class PrepareAndFinalizeWithNaiveMulticast(PrepareAndFinalize): - """ - MoE communication strategy using Naive Multicast (point-to-point broadcast). - Will be used in prefill when using allgather in decode. Each DP rank broadcasts its slice to all others. - Uses `cu_tokens_across_dp_cpu` (cumulative tokens) to locate slice boundaries. - """ - - def _naive_multicast(self, x: torch.Tensor, - cu_tokens_across_dp_cpu: torch.Tensor): - """ - Naive multicast implementation: - 1. Create global buffer sized by total tokens across DP. - 2. Current rank copies its slice into its designated buffer region. - 3. Each rank broadcasts its slice to all others via P2P. - - Args: - x (torch.Tensor): Local tensor [local_tokens, hidden_size] - cu_tokens_across_dp_cpu (torch.Tensor): Cumulative token counts per DP rank - - Returns: - torch.Tensor: Global tensor [total_tokens, hidden_size] - """ - assert len(x.shape) == 2, "Input must be 2D [tokens, features]" - buffer = torch.empty((cu_tokens_across_dp_cpu[-1], x.size(1)), - device=x.device, - dtype=x.dtype) - - # Copy local slice into buffer - start = 0 if self.moe_config.dp_rank == 0 else cu_tokens_across_dp_cpu[ - self.moe_config.dp_rank - 1] - end = cu_tokens_across_dp_cpu[self.moe_config.dp_rank] - buffer[start:end, :].copy_(x) - - # Broadcast each slice to all ranks - for idx in range(self.moe_config.dp_size): - start = 0 if idx == 0 else cu_tokens_across_dp_cpu[idx - 1] - end = cu_tokens_across_dp_cpu[idx] - get_dp_group().broadcast(buffer[start:end, :], idx) - return buffer - - def prepare( - self, - hidden_states: torch.Tensor, - router_logits: torch.Tensor, - enable_shared_expert_dp: bool = False, - replace_allreduce: bool = False, - quant_type=QuantType.NONE - ) -> tuple[torch.Tensor, torch.Tensor, Optional[torch.Tensor], - Optional[torch.Tensor]]: - """ - Preparation steps: - 1. Fetch cumulative token boundaries from forward context. - 2. Multicast hidden_states and router_logits to form global tensors. - - Returns: - Tuple of (global_hidden_states, global_router_logits, None, None) - """ - self.enable_shared_expert_dp = enable_shared_expert_dp - - if self.moe_config.dp_size > 1: - self.cu_tokens_across_dp_cpu = get_forward_context( - ).dp_metadata.cu_tokens_across_sp(1) - hidden_states = self._naive_multicast(hidden_states, - self.cu_tokens_across_dp_cpu) - router_logits = self._naive_multicast(router_logits, - self.cu_tokens_across_dp_cpu) - - if prefill_context_parallel_enable() and self.moe_config.pcp_size > 1: - hidden_states = get_pcp_group().all_gather( - hidden_states, - dim=0, - ) - router_logits = get_pcp_group().all_gather( - router_logits, - dim=0, - ) - - return hidden_states, router_logits, None, None - - def finalize(self, - hidden_states: torch.Tensor, - reduce_results: bool, - context_metadata: Optional[dict] = None) -> torch.Tensor: - """ - Finalization steps: - 1. If DP > 1 and not shared expert: - - All-reduce across DP - - Slice to current rank's token range using cu_tokens_across_dp_cpu - 2. If `reduce_results=True` and TP/EP > 1, apply tensor_model_parallel_all_reduce. - - Returns: - Tensor with shape [local_num_tokens, hidden_size] - """ - if self.moe_config.dp_size > 1 and not self.enable_shared_expert_dp: - start = 0 if self.moe_config.dp_rank == 0 else self.cu_tokens_across_dp_cpu[ - self.moe_config.dp_rank - 1] - end = self.cu_tokens_across_dp_cpu[self.moe_config.dp_rank] - hidden_states = get_dp_group().all_reduce( - hidden_states) # Sum across DP - hidden_states = hidden_states[start:end, :] - - if prefill_context_parallel_enable() and self.moe_config.pcp_size > 1: - hidden_states = get_pcp_group().reduce_scatter(hidden_states, - dim=0) - - if reduce_results and (self.moe_config.tp_size > 1 - or self.moe_config.ep_size > 1): - hidden_states = tensor_model_parallel_all_reduce(hidden_states) - - return hidden_states diff --git a/vllm_ascend/spec_decode/eagle_proposer.py b/vllm_ascend/spec_decode/eagle_proposer.py index d3be2ea9..4d076ac1 100644 --- a/vllm_ascend/spec_decode/eagle_proposer.py +++ b/vllm_ascend/spec_decode/eagle_proposer.py @@ -124,8 +124,7 @@ class EagleProposer(Proposer): num_tokens_across_dp: Optional[torch.Tensor] = None, aclgraph_runtime_mode: CUDAGraphMode = CUDAGraphMode.NONE, batch_descriptor=None): - moe_comm_type = self.runner._select_moe_comm_method( - num_tokens, with_prefill) + moe_comm_type = self.runner._select_moe_comm_method(num_tokens) with set_ascend_forward_context(None, self.vllm_config, moe_comm_type=moe_comm_type, @@ -460,11 +459,7 @@ class EagleProposer(Proposer): else: num_input_tokens = num_tokens - with_prefill = attn_metadata.attn_state not in [ - AscendAttentionState.DecodeOnly, AscendAttentionState.SpecDecoding - ] - moe_comm_type = self.runner._select_moe_comm_method( - num_input_tokens, with_prefill) + moe_comm_type = self.runner._select_moe_comm_method(num_input_tokens) # copy inputs to buffer for cudagraph self.positions[:num_tokens] = target_positions.to(device) @@ -504,8 +499,7 @@ class EagleProposer(Proposer): else: input_batch_size = batch_size - moe_comm_type = self.runner._select_moe_comm_method( - input_batch_size, False) + moe_comm_type = self.runner._select_moe_comm_method(input_batch_size) attn_metadata.num_actual_tokens = batch_size attn_metadata.max_query_len = 1 diff --git a/vllm_ascend/spec_decode/mtp_proposer.py b/vllm_ascend/spec_decode/mtp_proposer.py index 7aa9b729..556a917f 100644 --- a/vllm_ascend/spec_decode/mtp_proposer.py +++ b/vllm_ascend/spec_decode/mtp_proposer.py @@ -223,8 +223,7 @@ class MtpProposer(Proposer): with_prefill, ) = self.runner._sync_metadata_across_dp(num_tokens, with_prefill) - moe_comm_type = self.runner._select_moe_comm_method( - num_tokens, with_prefill) + moe_comm_type = self.runner._select_moe_comm_method(num_tokens) if skip_attn: attn_metadata = None @@ -672,8 +671,7 @@ class MtpProposer(Proposer): with_prefill) = self.runner._sync_metadata_across_dp( num_input_tokens, self.runner.with_prefill) - moe_comm_type = self.runner._select_moe_comm_method( - num_input_tokens, with_prefill) + moe_comm_type = self.runner._select_moe_comm_method(num_input_tokens) if scheduler_output: max_query_len = common_attn_metadata.max_query_len diff --git a/vllm_ascend/torchair/torchair_mtp_proposer.py b/vllm_ascend/torchair/torchair_mtp_proposer.py index 183e0da2..b816b8d8 100644 --- a/vllm_ascend/torchair/torchair_mtp_proposer.py +++ b/vllm_ascend/torchair/torchair_mtp_proposer.py @@ -81,8 +81,7 @@ class TorchairMtpProposer(MtpProposer): num_tokens_across_dp=None, aclgraph_runtime_mode: CUDAGraphMode = CUDAGraphMode.NONE, batch_descriptor=None) -> None: - moe_comm_type = self.runner._select_moe_comm_method( - num_tokens, with_prefill) + moe_comm_type = self.runner._select_moe_comm_method(num_tokens) if not with_prefill: skip_attn = False @@ -342,8 +341,7 @@ class TorchairMtpProposer(MtpProposer): num_tokens_across_dp = self.runner.num_tokens_across_dp with_prefill = self.runner.with_prefill - moe_comm_type = self.runner._select_moe_comm_method( - num_input_tokens, with_prefill) + moe_comm_type = self.runner._select_moe_comm_method(num_input_tokens) batch_descriptor = BatchDescriptor(num_tokens=num_input_tokens, uniform_decode=False) aclgraph_runtime_mode, batch_descriptor = \ diff --git a/vllm_ascend/worker/model_runner_v1.py b/vllm_ascend/worker/model_runner_v1.py index 124102f5..8f103ddc 100644 --- a/vllm_ascend/worker/model_runner_v1.py +++ b/vllm_ascend/worker/model_runner_v1.py @@ -2192,8 +2192,8 @@ class NPUModelRunner(LoRAModelRunnerMixin): kv_connector_output=kv_connector_output, ) - def _select_moe_comm_method(self, num_tokens: int, - with_prefill: bool) -> Optional[MoECommType]: + def _select_moe_comm_method(self, + num_tokens: int) -> Optional[MoECommType]: """1. If expert parallel is not enabled, we use all-gather since MC2 and all-to-all are designed for expert parallelism. 2. If expert parallel is enabled, we need to consider the soc version and the @@ -2244,12 +2244,6 @@ class NPUModelRunner(LoRAModelRunnerMixin): else: raise ValueError(f"Unsupported soc_version: {soc_version}") - if moe_comm_type == MoECommType.ALLGATHER and with_prefill: - if enable_sp(): - moe_comm_type = MoECommType.ALLGATHER - else: - moe_comm_type = MoECommType.NAIVE_MULTICAST - # PanguProMoE only supports allgather if model_type == "PanguProMoE": moe_comm_type = MoECommType.ALLGATHER @@ -2289,8 +2283,7 @@ class NPUModelRunner(LoRAModelRunnerMixin): if self.dynamic_eplb: self.eplb_updator.take_update_info_from_eplb_process() - moe_comm_type = self._select_moe_comm_method(num_input_tokens, - self.with_prefill) + moe_comm_type = self._select_moe_comm_method(num_input_tokens) uniform_decode = (max_query_len == self.uniform_decode_query_len) and ( scheduler_output.total_num_scheduled_tokens @@ -2823,7 +2816,7 @@ class NPUModelRunner(LoRAModelRunnerMixin): with_prefill) = self._sync_metadata_across_dp(num_tokens, with_prefill) - moe_comm_type = self._select_moe_comm_method(num_tokens, with_prefill) + moe_comm_type = self._select_moe_comm_method(num_tokens) # If cudagraph_mode.decode_mode() == FULL and # cudagraph_mode.seperate_routine(). This means that we are using @@ -2999,8 +2992,7 @@ class NPUModelRunner(LoRAModelRunnerMixin): # allowing vLLM to correctly estimate the maximum memory required. if self.max_num_tokens > self.mc2_tokens_capacity and \ self._select_moe_comm_method( - self.mc2_tokens_capacity, - with_prefill=True) == MoECommType.MC2: + self.mc2_tokens_capacity) == MoECommType.MC2: self._dummy_run(self.mc2_tokens_capacity, with_prefill=True) output = None