### What this PR does / why we need it?
This reverts commit
bf87606932.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
E2E vllm serving with `enable_shared_expert_dp: true` in eager mode as
before.
- vLLM version: v0.11.0rc3
- vLLM main: https://github.com/vllm-project/vllm/commit/v0.11.0
Signed-off-by: linfeng-yuan <1102311262@qq.com>
This commit is contained in:
@@ -1,5 +1,4 @@
|
|||||||
import unittest
|
import unittest
|
||||||
from unittest.mock import patch
|
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
import torch
|
import torch
|
||||||
@@ -54,9 +53,7 @@ class TestAscendRMSNorm(PytestBase):
|
|||||||
# Test case for the most common and basic scenario
|
# Test case for the most common and basic scenario
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
"residual", [None, torch.randn(4, 8, dtype=torch.float16)])
|
"residual", [None, torch.randn(4, 8, dtype=torch.float16)])
|
||||||
@patch("torch.ops.vllm.maybe_chunk_residual")
|
def test_forward_oot_basic(self, residual):
|
||||||
def test_forward_oot_basic(self, mock_maybe_chunk_residual, residual):
|
|
||||||
mock_maybe_chunk_residual.side_effect = lambda x, residual: residual
|
|
||||||
layer = RMSNorm(hidden_size=8, eps=1e-05)
|
layer = RMSNorm(hidden_size=8, eps=1e-05)
|
||||||
x = torch.randn(4, 8, dtype=torch.float16)
|
x = torch.randn(4, 8, dtype=torch.float16)
|
||||||
if residual is not None:
|
if residual is not None:
|
||||||
@@ -120,8 +117,6 @@ class TestAscendRMSNorm(PytestBase):
|
|||||||
mock_forward_context.layer_idx = 0
|
mock_forward_context.layer_idx = 0
|
||||||
mock_forward_context.num_hidden_layers = num_hidden_layers
|
mock_forward_context.num_hidden_layers = num_hidden_layers
|
||||||
mock_forward_context.fusion_linear = "gate_up_dense"
|
mock_forward_context.fusion_linear = "gate_up_dense"
|
||||||
mocker.patch("torch.ops.vllm.maybe_chunk_residual",
|
|
||||||
lambda x, residual: residual)
|
|
||||||
|
|
||||||
# Ensure fusion and layer_idx increment are handled correctly
|
# Ensure fusion and layer_idx increment are handled correctly
|
||||||
x = torch.randn(4, 8, dtype=torch.float16)
|
x = torch.randn(4, 8, dtype=torch.float16)
|
||||||
|
|||||||
@@ -1278,8 +1278,7 @@ class AscendMLAImpl(MLAAttentionImpl):
|
|||||||
current_ms_metadata = get_multistream_comm_context()
|
current_ms_metadata = get_multistream_comm_context()
|
||||||
if current_ms_metadata is not None:
|
if current_ms_metadata is not None:
|
||||||
with torch.npu.stream(current_ms_metadata.comm_stream):
|
with torch.npu.stream(current_ms_metadata.comm_stream):
|
||||||
o_proj_input[
|
o_proj_input[num_decode_tokens:] = output_prefill
|
||||||
num_decode_tokens:num_actual_tokens] = output_prefill
|
|
||||||
current_ms_metadata.after_comm_event.record()
|
current_ms_metadata.after_comm_event.record()
|
||||||
else:
|
else:
|
||||||
o_proj_input[
|
o_proj_input[
|
||||||
|
|||||||
@@ -144,17 +144,8 @@ class AscendMultiHeadLatentAttention(MultiHeadLatentAttentionWrapper):
|
|||||||
hidden_states: torch.Tensor,
|
hidden_states: torch.Tensor,
|
||||||
kv_cache: Optional[torch.Tensor] = None,
|
kv_cache: Optional[torch.Tensor] = None,
|
||||||
attn_metadata: Optional[AttentionMetadata] = None) -> torch.Tensor:
|
attn_metadata: Optional[AttentionMetadata] = None) -> torch.Tensor:
|
||||||
forward_context = get_forward_context()
|
need_gather_q_kv = get_forward_context().sp_enabled
|
||||||
sp_enabled = forward_context.sp_enabled
|
output_shape = hidden_states.shape
|
||||||
need_gather_q_kv = False
|
|
||||||
if sp_enabled and self.debug_layer_idx < self.layers:
|
|
||||||
need_gather_q_kv = True
|
|
||||||
if not sp_enabled or self.debug_layer_idx < self.layers:
|
|
||||||
output_shape = hidden_states.shape
|
|
||||||
else:
|
|
||||||
# used in deepseek mtp layer
|
|
||||||
output_shape = torch.chunk(hidden_states, self.tp_size,
|
|
||||||
dim=0)[0].shape
|
|
||||||
# FIXME: This does not seem right, should make sure the buffer is fixed
|
# FIXME: This does not seem right, should make sure the buffer is fixed
|
||||||
output = torch.empty(output_shape,
|
output = torch.empty(output_shape,
|
||||||
dtype=hidden_states.dtype,
|
dtype=hidden_states.dtype,
|
||||||
|
|||||||
@@ -99,7 +99,6 @@ class AscendRMSNorm(RMSNorm):
|
|||||||
import torch_npu
|
import torch_npu
|
||||||
|
|
||||||
if residual is not None:
|
if residual is not None:
|
||||||
residual = torch.ops.vllm.maybe_chunk_residual(x, residual)
|
|
||||||
assert x.size(0) == residual.size(0)
|
assert x.size(0) == residual.size(0)
|
||||||
x, residual = _addrmsnorm_forward_oot(
|
x, residual = _addrmsnorm_forward_oot(
|
||||||
self, x, residual, self.next_need_quant_fusion_linear,
|
self, x, residual, self.next_need_quant_fusion_linear,
|
||||||
|
|||||||
@@ -2,7 +2,6 @@ import torch
|
|||||||
import torch.nn.functional as F
|
import torch.nn.functional as F
|
||||||
import torch_npu
|
import torch_npu
|
||||||
from vllm.distributed import (get_dp_group, get_ep_group,
|
from vllm.distributed import (get_dp_group, get_ep_group,
|
||||||
get_tensor_model_parallel_rank,
|
|
||||||
get_tensor_model_parallel_world_size,
|
get_tensor_model_parallel_world_size,
|
||||||
tensor_model_parallel_all_gather,
|
tensor_model_parallel_all_gather,
|
||||||
tensor_model_parallel_all_reduce,
|
tensor_model_parallel_all_reduce,
|
||||||
@@ -16,27 +15,6 @@ from vllm_ascend.ops.weight_prefetch import maybe_npu_prefetch
|
|||||||
from vllm_ascend.utils import npu_stream_switch, prefetch_stream
|
from vllm_ascend.utils import npu_stream_switch, prefetch_stream
|
||||||
|
|
||||||
|
|
||||||
def _maybe_chunk_residual_impl(x: torch.Tensor,
|
|
||||||
residual: torch.Tensor) -> torch.Tensor:
|
|
||||||
try:
|
|
||||||
forward_context = get_forward_context()
|
|
||||||
except AssertionError:
|
|
||||||
return residual
|
|
||||||
|
|
||||||
if x.size(0) != residual.size(0):
|
|
||||||
sp_enabled = forward_context.sp_enabled
|
|
||||||
assert sp_enabled is True, ("Currently, this situation only occurs "
|
|
||||||
"when sp is enabled")
|
|
||||||
pad_size = forward_context.pad_size
|
|
||||||
if pad_size > 0:
|
|
||||||
residual = F.pad(residual, (0, 0, 0, pad_size))
|
|
||||||
tp_size = get_tensor_model_parallel_world_size()
|
|
||||||
tp_rank = get_tensor_model_parallel_rank()
|
|
||||||
residual = torch.chunk(residual, tp_size, dim=0)[tp_rank]
|
|
||||||
|
|
||||||
return residual
|
|
||||||
|
|
||||||
|
|
||||||
def _maybe_all_gather_and_maybe_unpad_impl(
|
def _maybe_all_gather_and_maybe_unpad_impl(
|
||||||
x: torch.Tensor,
|
x: torch.Tensor,
|
||||||
label: bool,
|
label: bool,
|
||||||
@@ -257,11 +235,6 @@ def _maybe_all_reduce_tensor_model_parallel_impl(
|
|||||||
return tensor_model_parallel_all_reduce(final_hidden_states)
|
return tensor_model_parallel_all_reduce(final_hidden_states)
|
||||||
|
|
||||||
|
|
||||||
direct_register_custom_op(op_name="maybe_chunk_residual",
|
|
||||||
op_func=_maybe_chunk_residual_impl,
|
|
||||||
fake_impl=lambda x, residual: x,
|
|
||||||
mutates_args=[],
|
|
||||||
dispatch_key="PrivateUse1")
|
|
||||||
direct_register_custom_op(op_name="maybe_all_gather_and_maybe_unpad",
|
direct_register_custom_op(op_name="maybe_all_gather_and_maybe_unpad",
|
||||||
op_func=_maybe_all_gather_and_maybe_unpad_impl,
|
op_func=_maybe_all_gather_and_maybe_unpad_impl,
|
||||||
fake_impl=_maybe_all_gather_and_maybe_unpad_fake,
|
fake_impl=_maybe_all_gather_and_maybe_unpad_fake,
|
||||||
|
|||||||
@@ -284,7 +284,7 @@ class NPUPlatform(Platform):
|
|||||||
if parallel_config and parallel_config.worker_cls == "auto":
|
if parallel_config and parallel_config.worker_cls == "auto":
|
||||||
# TODO: this is a tricky way to disable `use_sequence_parallel_moe` in vllm.
|
# TODO: this is a tricky way to disable `use_sequence_parallel_moe` in vllm.
|
||||||
os.environ["VLLM_ALL2ALL_BACKEND"] = "flashinfer_all2allv"
|
os.environ["VLLM_ALL2ALL_BACKEND"] = "flashinfer_all2allv"
|
||||||
if ascend_config.torchair_graph_config.enabled:
|
if ascend_config.torchair_graph_config.enabled or ascend_config.enable_shared_expert_dp:
|
||||||
parallel_config.worker_cls = "vllm_ascend.torchair.torchair_worker.NPUTorchairWorker"
|
parallel_config.worker_cls = "vllm_ascend.torchair.torchair_worker.NPUTorchairWorker"
|
||||||
else:
|
else:
|
||||||
parallel_config.worker_cls = "vllm_ascend.worker.worker_v1.NPUWorker"
|
parallel_config.worker_cls = "vllm_ascend.worker.worker_v1.NPUWorker"
|
||||||
@@ -337,6 +337,8 @@ class NPUPlatform(Platform):
|
|||||||
ascend_config = get_ascend_config()
|
ascend_config = get_ascend_config()
|
||||||
|
|
||||||
if use_mla and ascend_config.enable_shared_expert_dp:
|
if use_mla and ascend_config.enable_shared_expert_dp:
|
||||||
|
if use_mla and not use_sparse:
|
||||||
|
return "vllm_ascend.torchair.torchair_mla.AscendMLATorchairBackend"
|
||||||
if use_mla and use_sparse:
|
if use_mla and use_sparse:
|
||||||
return "vllm_ascend.torchair.torchair_sfa.AscendSFATorchairBackend"
|
return "vllm_ascend.torchair.torchair_sfa.AscendSFATorchairBackend"
|
||||||
|
|
||||||
|
|||||||
@@ -82,7 +82,9 @@ class MtpProposer(Proposer):
|
|||||||
with set_default_torch_dtype(
|
with set_default_torch_dtype(
|
||||||
draft_model_config.dtype), set_current_vllm_config(
|
draft_model_config.dtype), set_current_vllm_config(
|
||||||
self.vllm_config):
|
self.vllm_config):
|
||||||
if self.torchair_graph_enabled:
|
if self.torchair_graph_enabled or (
|
||||||
|
self.enable_shared_expert_dp
|
||||||
|
and self.vllm_config.model_config.use_mla):
|
||||||
self.model = TorchairDeepSeekMTP(
|
self.model = TorchairDeepSeekMTP(
|
||||||
vllm_config=self.vllm_config).to(target_device)
|
vllm_config=self.vllm_config).to(target_device)
|
||||||
else:
|
else:
|
||||||
|
|||||||
Reference in New Issue
Block a user