[Feat] shared expert dp for deepseek_mtp (#3811)
### What this PR does / why we need it? Support shared expert DP for deepseek_mtp feature. `shared_expert_dp` requires `SP==True`, with corresponding parameter restrictions. Previously, due to the coupling between `shared_expert_dp` and torchair, and the removal of `deepseek_mtp` in vllm_ascend, shared expert dp of deepseek_mtp was temporarily removed. Currently, by performing the `reduce_scatter` on the input of deepssek_mtp in `mtp_proposer.py`, we ensure that it matches the dimensions of `input_embedding`, and then perform the `all_gather` on the output of mtp. ### How was this patch tested? baseline: <img width="1184" height="692" alt="image" src="https://github.com/user-attachments/assets/9680d53a-7b1d-481a-accc-b8f3dae2b9e3" /> enable shared_expert_dp and multistream_overlap_shared_expert: <img width="1167" height="687" alt="image" src="https://github.com/user-attachments/assets/2531d06b-dfda-4e24-8628-6f4b0f677ddc" /> TPOT: 48ms -> 45.4ms Average TPS per rank: 117.6 -> 126.1 - vLLM version: v0.11.2 - vLLM main: https://github.com/vllm-project/vllm/commit/v0.11.2 --------- Signed-off-by: chenmenglong <chenmenglong1@huawei.com> Signed-off-by: zengran <zengran2@huawei.com> Co-authored-by: zengran <zengran2@huawei.com>
This commit is contained in:
@@ -110,6 +110,7 @@ class AscendRMSNorm(RMSNorm):
|
||||
import torch_npu
|
||||
|
||||
if residual is not None:
|
||||
residual = torch.ops.vllm.maybe_chunk_residual(x, residual)
|
||||
assert x.size(0) == residual.size(0)
|
||||
x, residual = _addrmsnorm_forward_oot(
|
||||
self, x, residual, self.next_need_quant_fusion_linear,
|
||||
|
||||
@@ -2,6 +2,7 @@ import torch
|
||||
import torch.nn.functional as F
|
||||
import torch_npu
|
||||
from vllm.distributed import (get_dp_group, get_ep_group,
|
||||
get_tensor_model_parallel_rank,
|
||||
get_tensor_model_parallel_world_size,
|
||||
tensor_model_parallel_all_gather,
|
||||
tensor_model_parallel_all_reduce,
|
||||
@@ -15,6 +16,27 @@ from vllm_ascend.ops.weight_prefetch import maybe_npu_prefetch
|
||||
from vllm_ascend.utils import npu_stream_switch, prefetch_stream
|
||||
|
||||
|
||||
def _maybe_chunk_residual_impl(x: torch.Tensor,
|
||||
residual: torch.Tensor) -> torch.Tensor:
|
||||
try:
|
||||
forward_context = get_forward_context()
|
||||
except AssertionError:
|
||||
return residual
|
||||
|
||||
if x.size(0) != residual.size(0):
|
||||
sp_enabled = forward_context.sp_enabled
|
||||
assert sp_enabled is True, ("Currently, this situation only occurs "
|
||||
"when sp is enabled")
|
||||
pad_size = forward_context.pad_size
|
||||
if pad_size > 0:
|
||||
residual = F.pad(residual, (0, 0, 0, pad_size))
|
||||
tp_size = get_tensor_model_parallel_world_size()
|
||||
tp_rank = get_tensor_model_parallel_rank()
|
||||
residual = torch.chunk(residual, tp_size, dim=0)[tp_rank]
|
||||
|
||||
return residual
|
||||
|
||||
|
||||
def _maybe_all_gather_and_maybe_unpad_impl(
|
||||
x: torch.Tensor,
|
||||
label: bool,
|
||||
@@ -259,6 +281,12 @@ def _matmul_and_reduce_impl_fake(input_parallel: torch.Tensor,
|
||||
return output
|
||||
|
||||
|
||||
direct_register_custom_op(op_name="maybe_chunk_residual",
|
||||
op_func=_maybe_chunk_residual_impl,
|
||||
fake_impl=lambda x, residual: x,
|
||||
mutates_args=[],
|
||||
dispatch_key="PrivateUse1")
|
||||
|
||||
direct_register_custom_op(op_name="maybe_all_gather_and_maybe_unpad",
|
||||
op_func=_maybe_all_gather_and_maybe_unpad_impl,
|
||||
fake_impl=_maybe_all_gather_and_maybe_unpad_fake,
|
||||
|
||||
Reference in New Issue
Block a user