Delete redundant codes related to communication (#2717)
### What this PR does / why we need it?
Delete redundant codes related to communication
### Does this PR introduce _any_ user-facing change?
not involve
### How was this patch tested?
not involve
- vLLM version: v0.10.1.1
- vLLM main:
6c7af8110a
---------
Signed-off-by: 刘哲续 <liuzhexu1@huawei.com>
Co-authored-by: 刘哲续 <liuzhexu1@huawei.com>
This commit is contained in:
@@ -139,7 +139,6 @@ def mock_dist_env(mocker: MockerFixture):
|
|||||||
patch('torch.distributed.all_gather'), \
|
patch('torch.distributed.all_gather'), \
|
||||||
patch('torch.distributed.all_to_all_single'), \
|
patch('torch.distributed.all_to_all_single'), \
|
||||||
patch('vllm_ascend.ops.fused_moe.tensor_model_parallel_all_reduce'), \
|
patch('vllm_ascend.ops.fused_moe.tensor_model_parallel_all_reduce'), \
|
||||||
patch('vllm_ascend.ops.fused_moe.data_parallel_reduce_scatter'), \
|
|
||||||
patch('vllm.model_executor.layers.fused_moe.config.get_dp_group',
|
patch('vllm.model_executor.layers.fused_moe.config.get_dp_group',
|
||||||
return_value=mock_dp_and_tp_group(mocker)), \
|
return_value=mock_dp_and_tp_group(mocker)), \
|
||||||
patch('vllm_ascend.ops.fused_moe.get_ascend_config',
|
patch('vllm_ascend.ops.fused_moe.get_ascend_config',
|
||||||
|
|||||||
@@ -66,8 +66,6 @@ def mock_dist_env(mocker: MockerFixture):
|
|||||||
patch('torch.distributed.all_to_all_single', return_value=torch.randn(8, 32)), \
|
patch('torch.distributed.all_to_all_single', return_value=torch.randn(8, 32)), \
|
||||||
patch('vllm_ascend.torchair.ops.torchair_fused_moe.tensor_model_parallel_all_reduce',
|
patch('vllm_ascend.torchair.ops.torchair_fused_moe.tensor_model_parallel_all_reduce',
|
||||||
return_value=torch.randn(5, 32)), \
|
return_value=torch.randn(5, 32)), \
|
||||||
patch('vllm_ascend.torchair.ops.torchair_fused_moe.data_parallel_reduce_scatter',
|
|
||||||
return_value=torch.randn(5, 32)), \
|
|
||||||
patch('vllm.model_executor.layers.fused_moe.config.get_dp_group',
|
patch('vllm.model_executor.layers.fused_moe.config.get_dp_group',
|
||||||
return_value=mock_dp_and_tp_group(mocker)), \
|
return_value=mock_dp_and_tp_group(mocker)), \
|
||||||
patch('vllm_ascend.torchair.ops.torchair_fused_moe.get_ascend_config',
|
patch('vllm_ascend.torchair.ops.torchair_fused_moe.get_ascend_config',
|
||||||
|
|||||||
@@ -1,25 +0,0 @@
|
|||||||
#
|
|
||||||
# Copyright (c) 2025 Huawei Technologies Co., Ltd. All Rights Reserved.
|
|
||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
# you may not use this file except in compliance with the License.
|
|
||||||
# You may obtain a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
# See the License for the specific language governing permissions and
|
|
||||||
# limitations under the License.
|
|
||||||
# This file is a part of the vllm-ascend project.
|
|
||||||
#
|
|
||||||
|
|
||||||
import torch
|
|
||||||
from vllm.distributed.parallel_state import get_dp_group
|
|
||||||
|
|
||||||
|
|
||||||
def data_parallel_reduce_scatter(input_: torch.Tensor,
|
|
||||||
dim: int = -1) -> torch.Tensor:
|
|
||||||
"""Reduce-Scatter the input tensor across data parallel group."""
|
|
||||||
return get_dp_group().reduce_scatter(input_, dim)
|
|
||||||
@@ -7,12 +7,11 @@ import torch.nn as nn
|
|||||||
import torch_npu
|
import torch_npu
|
||||||
from vllm.distributed import tensor_model_parallel_all_reduce
|
from vllm.distributed import tensor_model_parallel_all_reduce
|
||||||
from vllm.distributed.parallel_state import (
|
from vllm.distributed.parallel_state import (
|
||||||
get_tensor_model_parallel_rank, get_tensor_model_parallel_world_size)
|
get_dp_group, get_tensor_model_parallel_rank,
|
||||||
|
get_tensor_model_parallel_world_size)
|
||||||
from vllm.forward_context import get_forward_context
|
from vllm.forward_context import get_forward_context
|
||||||
from vllm.model_executor.layers.fused_moe import FusedMoEConfig
|
from vllm.model_executor.layers.fused_moe import FusedMoEConfig
|
||||||
|
|
||||||
from vllm_ascend.distributed.communication_op import \
|
|
||||||
data_parallel_reduce_scatter
|
|
||||||
from vllm_ascend.distributed.parallel_state import get_mc2_group
|
from vllm_ascend.distributed.parallel_state import get_mc2_group
|
||||||
from vllm_ascend.utils import AscendSocVersion, get_ascend_soc_version
|
from vllm_ascend.utils import AscendSocVersion, get_ascend_soc_version
|
||||||
|
|
||||||
@@ -147,7 +146,7 @@ class AllGatherCommImpl(MoECommMethod):
|
|||||||
When TP size > 1, all-reduce the hidden states to get the final output.
|
When TP size > 1, all-reduce the hidden states to get the final output.
|
||||||
"""
|
"""
|
||||||
if self.moe_config.dp_size > 1:
|
if self.moe_config.dp_size > 1:
|
||||||
hidden_states = data_parallel_reduce_scatter(hidden_states, dim=0)
|
hidden_states = get_dp_group().reduce_scatter(hidden_states, 0)
|
||||||
hidden_states = hidden_states[:self.num_tokens]
|
hidden_states = hidden_states[:self.num_tokens]
|
||||||
|
|
||||||
if reduce_results and (self.moe_config.tp_size > 1
|
if reduce_results and (self.moe_config.tp_size > 1
|
||||||
|
|||||||
@@ -40,8 +40,6 @@ from vllm.model_executor.layers.quantization.base_config import \
|
|||||||
|
|
||||||
from vllm_ascend.ascend_config import get_ascend_config
|
from vllm_ascend.ascend_config import get_ascend_config
|
||||||
from vllm_ascend.ascend_forward_context import FusedMoEState
|
from vllm_ascend.ascend_forward_context import FusedMoEState
|
||||||
from vllm_ascend.distributed.communication_op import \
|
|
||||||
data_parallel_reduce_scatter
|
|
||||||
from vllm_ascend.distributed.parallel_state import get_mc2_group
|
from vllm_ascend.distributed.parallel_state import get_mc2_group
|
||||||
from vllm_ascend.ops.expert_load_balancer import ExpertLoadBalancer
|
from vllm_ascend.ops.expert_load_balancer import ExpertLoadBalancer
|
||||||
from vllm_ascend.ops.layers.experts_selector import select_experts
|
from vllm_ascend.ops.layers.experts_selector import select_experts
|
||||||
@@ -537,8 +535,8 @@ class AscendFusedMoE(FusedMoE):
|
|||||||
final_hidden_states = final_hidden_states[start:end, :]
|
final_hidden_states = final_hidden_states[start:end, :]
|
||||||
dispose_tensor(e_hidden_states)
|
dispose_tensor(e_hidden_states)
|
||||||
elif fused_moe_state == FusedMoEState.AllGather:
|
elif fused_moe_state == FusedMoEState.AllGather:
|
||||||
final_hidden_states = data_parallel_reduce_scatter(
|
final_hidden_states = get_dp_group().reduce_scatter(
|
||||||
e_hidden_states, dim=0)
|
e_hidden_states, 0)
|
||||||
final_hidden_states = final_hidden_states[:num_tokens]
|
final_hidden_states = final_hidden_states[:num_tokens]
|
||||||
dispose_tensor(e_hidden_states)
|
dispose_tensor(e_hidden_states)
|
||||||
else:
|
else:
|
||||||
|
|||||||
@@ -40,8 +40,6 @@ from vllm.model_executor.layers.quantization.base_config import \
|
|||||||
|
|
||||||
from vllm_ascend.ascend_config import get_ascend_config
|
from vllm_ascend.ascend_config import get_ascend_config
|
||||||
from vllm_ascend.ascend_forward_context import FusedMoEState
|
from vllm_ascend.ascend_forward_context import FusedMoEState
|
||||||
from vllm_ascend.distributed.communication_op import \
|
|
||||||
data_parallel_reduce_scatter
|
|
||||||
from vllm_ascend.distributed.parallel_state import get_mc2_group
|
from vllm_ascend.distributed.parallel_state import get_mc2_group
|
||||||
from vllm_ascend.ops.expert_load_balancer import ExpertLoadBalancer
|
from vllm_ascend.ops.expert_load_balancer import ExpertLoadBalancer
|
||||||
from vllm_ascend.ops.sequence_parallel import MetadataForPadding
|
from vllm_ascend.ops.sequence_parallel import MetadataForPadding
|
||||||
@@ -1269,8 +1267,8 @@ class TorchairAscendFusedMoE(FusedMoE):
|
|||||||
final_hidden_states = final_hidden_states[start:end, :]
|
final_hidden_states = final_hidden_states[start:end, :]
|
||||||
dispose_tensor(e_hidden_states)
|
dispose_tensor(e_hidden_states)
|
||||||
elif fused_moe_state == FusedMoEState.AllGather:
|
elif fused_moe_state == FusedMoEState.AllGather:
|
||||||
final_hidden_states = data_parallel_reduce_scatter(
|
final_hidden_states = get_dp_group().reduce_scatter(
|
||||||
e_hidden_states, dim=0)
|
e_hidden_states, 0)
|
||||||
final_hidden_states = final_hidden_states[:num_tokens]
|
final_hidden_states = final_hidden_states[:num_tokens]
|
||||||
dispose_tensor(e_hidden_states)
|
dispose_tensor(e_hidden_states)
|
||||||
else:
|
else:
|
||||||
|
|||||||
Reference in New Issue
Block a user