[refactor] refactor deepseek-related files (#2849)

### What this PR does / why we need it?
This PR deletes ~2K lines of code about deepseek modeling. It falls back
CustomDeepseekV2 modules to original vllm implementations and adapts
some modifications in vllm about deepseek and moe.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
E2E  vllm serving with torchair graph mode and eager mode.

- vLLM version: v0.10.2
- vLLM main:
759ef49b15

---------

Signed-off-by: linfeng-yuan <1102311262@qq.com>
Signed-off-by: Yizhou Liu <liu_yizhou@outlook.com>
Co-authored-by: yiz-liu <136800916+yiz-liu@users.noreply.github.com>
Co-authored-by: Yizhou Liu <liu_yizhou@outlook.com>
This commit is contained in:
linfeng-yuan
2025-09-16 14:13:07 +08:00
committed by GitHub
parent 18ca7861f6
commit 1c5900327b
18 changed files with 295 additions and 1899 deletions

View File

@@ -14,14 +14,24 @@ def test_e2e_ep_correctness(model_name):
]
max_tokens = 5
with VllmRunner(model_name, tensor_parallel_size=2,
enforce_eager=True) as vllm_model:
# FIXME: Really strange that chunked prefill might lead to different results, investigate further
with VllmRunner(
model_name,
tensor_parallel_size=2,
additional_config={"ascend_scheduler_config": {
"enabled": True
}},
enforce_eager=True) as vllm_model:
tp_output = vllm_model.generate_greedy(example_prompts, max_tokens)
with VllmRunner(model_name,
tensor_parallel_size=2,
enable_expert_parallel=True,
enforce_eager=True) as vllm_model:
with VllmRunner(
model_name,
tensor_parallel_size=2,
enable_expert_parallel=True,
additional_config={"ascend_scheduler_config": {
"enabled": True
}},
enforce_eager=True) as vllm_model:
ep_output = vllm_model.generate_greedy(example_prompts, max_tokens)
check_outputs_equal(

View File

@@ -22,6 +22,8 @@ Run `pytest tests/multicard/test_torchair_graph_mode.py`.
import os
from typing import Dict
import pytest
from tests.e2e.conftest import VllmRunner
os.environ["PYTORCH_NPU_ALLOC_CONF"] = "max_split_size_mb:256"
@@ -153,6 +155,7 @@ def _pangu_torchair_test_fixture(
print(f"Generated text: {vllm_output[i][1]!r}")
@pytest.mark.skip("skipping test_e2e_pangu_with_torchair")
def test_e2e_pangu_with_torchair():
additional_config = {
"torchair_graph_config": {

114
tests/ut/models/conftest.py Normal file
View File

@@ -0,0 +1,114 @@
from types import SimpleNamespace
from unittest.mock import MagicMock, Mock, patch
import pytest
import torch
from transformers import PretrainedConfig
from vllm.config import CacheConfig, EPLBConfig, ParallelConfig
from vllm.distributed.parallel_state import GroupCoordinator
@pytest.fixture
def base_config():
config = PretrainedConfig(
hidden_size=128,
num_attention_heads=8,
num_hidden_layers=2,
intermediate_size=256,
hidden_act="silu",
rms_norm_eps=1e-6,
rope_theta=10000.0,
max_position_embeddings=2048,
n_routed_experts=4,
n_shared_experts=1,
moe_intermediate_size=256,
num_experts_per_tok=2,
routed_scaling_factor=1.0,
first_k_dense_replace=0,
moe_layer_freq=1,
kv_lora_rank=16,
qk_nope_head_dim=16,
qk_rope_head_dim=16,
v_head_dim=32,
topk_method="noaux_tc",
scoring_func="softmax",
norm_topk_prob=True,
n_group=1,
topk_group=1,
vocab_size=10000,
)
return config
@pytest.fixture
def vllm_config(base_config):
model_config = SimpleNamespace(
hf_config=base_config,
tensor_parallel_size=1,
dtype=torch.float32,
use_mla=True,
quant_config=None,
max_model_len=2048,
)
parallel_config = MagicMock(spec=ParallelConfig)
eplb_config = MagicMock(spec=EPLBConfig)
eplb_config.num_redundant_experts = 0
parallel_config.eplb_config = eplb_config
cache_config = CacheConfig()
vllm_config = Mock()
vllm_config.model_config = model_config
vllm_config.cache_config = cache_config
vllm_config.quant_config = None
vllm_config.parallel_config = parallel_config
return vllm_config
@pytest.fixture
def mock_distributed():
tp_group = Mock(spec=GroupCoordinator)
tp_group.rank_in_group = 0
tp_group.world_size = 1
tp_group.device_group = Mock()
dp_group = Mock(spec=GroupCoordinator)
dp_group.rank_in_group = 0
dp_group.world_size = 1
ep_group = Mock(spec=GroupCoordinator)
ep_group.rank_in_group = 0
ep_group.world_size = 1
ep_group.device_group = Mock()
ep_group.device_group.rank.return_value = 0
ep_group.device_group.size.return_value = 1
pp_group = Mock(spec=GroupCoordinator)
pp_group.rank_in_group = 0
pp_group.world_size = 1
mock_vllm_config = Mock()
mock_vllm_config.scheduler_config = Mock(max_num_seqs=256)
mock_vllm_config.model_config = Mock(max_model_len=2048, quant_config=None)
with patch("vllm_ascend.models.deepseek_v2.get_tensor_model_parallel_rank", return_value=0), \
patch("vllm_ascend.models.deepseek_v2.get_tensor_model_parallel_world_size", return_value=1), \
patch("vllm_ascend.models.deepseek_v2.get_tp_group", return_value=tp_group), \
patch("vllm_ascend.models.deepseek_v2.get_pp_group", return_value=pp_group), \
patch("vllm_ascend.models.deepseek_v2.get_pp_group",
return_value=Mock(is_first_rank=False, is_last_rank=False)), \
patch("vllm_ascend.ops.fused_moe.get_current_vllm_config", return_value=mock_vllm_config), \
patch("vllm_ascend.ops.moe.token_dispatcher.torch.distributed.get_rank", return_value=0), \
patch("vllm_ascend.ops.moe.token_dispatcher.get_ascend_soc_version", return_value=None), \
patch.dict("vllm.distributed.parallel_state.__dict__", _TP=tp_group, _EP=ep_group, _DP=dp_group,
_PP=pp_group), \
patch.dict("vllm_ascend.distributed.parallel_state.__dict__", _MC2=ep_group), \
patch("torch.npu.current_device", return_value=0):
yield
@pytest.fixture
def mock_forward_context():
forward_context = Mock(in_profile_run=False, with_prefill=False)
with patch("vllm_ascend.models.deepseek_v2.get_forward_context",
return_value=forward_context):
yield

View File

@@ -13,10 +13,13 @@ from vllm_ascend.models.deepseek_mtp import (
class TestCustomDeepSeekMultiTokenPredictorLayer(PytestBase):
@pytest.fixture
def setup_mtp_layer(self, mocker: MockerFixture):
def setup_mtp_layer(self, mocker: MockerFixture, vllm_config: VllmConfig,
mock_distributed):
config = PretrainedConfig(vocab_size=1000,
hidden_size=768,
rms_norm_eps=1e-5)
mocker.patch("vllm_ascend.models.deepseek_mtp.get_current_vllm_config",
return_value=vllm_config)
mocker.patch(
"vllm.model_executor.layers.vocab_parallel_embedding.VocabParallelEmbedding.__init__",
return_value=None)
@@ -29,15 +32,15 @@ class TestCustomDeepSeekMultiTokenPredictorLayer(PytestBase):
"vllm_ascend.models.deepseek_mtp.CustomDeepSeekShareHead.__init__",
return_value=None)
mocker_deepseek_v2_decode_layer = mocker.patch(
"vllm_ascend.models.deepseek_v2.CustomDeepseekV2DecoderLayer.__init__",
"vllm.model_executor.models.deepseek_v2.DeepseekV2DecoderLayer.__init__",
return_value=None)
mocker.patch(
"vllm_ascend.ops.vocab_parallel_embedding.AscendVocabParallelEmbedding.__init__",
return_value=None)
mocker.patch("vllm_ascend.utils.get_ascend_config",
mocker.patch("vllm_ascend.models.deepseek_v2.get_ascend_config",
return_value=mocker.Mock())
mtp_layer = CustomDeepSeekMultiTokenPredictorLayer(config, "", None)
mtp_layer = CustomDeepSeekMultiTokenPredictorLayer(config, "0", None)
mocker_deepseek_v2_decode_layer.assert_called_once()
return mtp_layer

View File

@@ -12,163 +12,19 @@
# limitations under the License.
# This file is a part of the vllm-ascend project.
#
from types import SimpleNamespace
from unittest.mock import Mock, patch
import pytest
import torch
from transformers import PretrainedConfig
from vllm.config import CacheConfig
from vllm.distributed.parallel_state import GroupCoordinator
from vllm.model_executor.layers.logits_processor import LogitsProcessor
from vllm.model_executor.layers.vocab_parallel_embedding import ParallelLMHead
from vllm_ascend.models.deepseek_v2 import (
CustomDeepseekV2MergedReplicatedLinear, CustomDeepseekV2MLAAttention,
CustomDeepseekV2MLP, CustomDeepseekV2RowParallelLinear,
CustomDeepseekV2RowParallelLinearReplaceAllreduce,
CustomDeepseekV2SiluAndMul, LogitsProcessor, ParallelLMHead)
from vllm_ascend.models.deepseek_v2 import (CustomDeepseekV2MLAAttention,
CustomDeepseekV2RowParallelLinear)
@pytest.fixture
def base_config():
config = PretrainedConfig(
hidden_size=128,
num_attention_heads=8,
num_hidden_layers=2,
intermediate_size=256,
hidden_act="silu",
rms_norm_eps=1e-6,
rope_theta=10000.0,
max_position_embeddings=2048,
n_routed_experts=4,
n_shared_experts=1,
moe_intermediate_size=256,
num_experts_per_tok=2,
routed_scaling_factor=1.0,
first_k_dense_replace=0,
moe_layer_freq=1,
kv_lora_rank=16,
qk_nope_head_dim=16,
qk_rope_head_dim=16,
v_head_dim=32,
topk_method="noaux_tc",
scoring_func="softmax",
norm_topk_prob=True,
n_group=1,
topk_group=1,
vocab_size=10000,
)
return config
@pytest.fixture
def vllm_config(base_config):
model_config = SimpleNamespace(
hf_config=base_config,
tensor_parallel_size=1,
dtype=torch.float32,
use_mla=False,
quant_config=None,
max_model_len=2048,
)
cache_config = CacheConfig()
vllm_config = Mock()
vllm_config.model_config = model_config
vllm_config.cache_config = cache_config
vllm_config.quant_config = None
return vllm_config
@pytest.fixture
def mock_distributed():
tp_group = Mock(spec=GroupCoordinator)
tp_group.rank_in_group = 0
tp_group.world_size = 1
tp_group.device_group = Mock()
dp_group = Mock(spec=GroupCoordinator)
dp_group.rank_in_group = 0
dp_group.world_size = 1
ep_group = Mock(spec=GroupCoordinator)
ep_group.rank_in_group = 0
ep_group.world_size = 1
pp_group = Mock(spec=GroupCoordinator)
pp_group.rank_in_group = 0
pp_group.world_size = 1
mock_vllm_config = Mock()
mock_vllm_config.scheduler_config = Mock(max_num_seqs=256)
mock_vllm_config.model_config = Mock(max_model_len=2048, quant_config=None)
with patch("vllm_ascend.models.deepseek_v2.get_tensor_model_parallel_rank", return_value=0), \
patch("vllm_ascend.models.deepseek_v2.get_tensor_model_parallel_world_size", return_value=1), \
patch("vllm_ascend.models.deepseek_v2.get_tp_group", return_value=tp_group), \
patch("vllm_ascend.models.deepseek_v2.get_ep_group", return_value=ep_group), \
patch("vllm_ascend.models.deepseek_v2.get_dp_group", return_value=dp_group), \
patch("vllm_ascend.models.deepseek_v2.get_pp_group", return_value=pp_group), \
patch("vllm_ascend.models.deepseek_v2.get_pp_group",
return_value=Mock(is_first_rank=False, is_last_rank=False)), \
patch("vllm_ascend.ops.fused_moe.get_current_vllm_config", return_value=mock_vllm_config), \
patch.dict("vllm.distributed.parallel_state.__dict__", _TP=tp_group, _EP=ep_group, _DP=dp_group,
_PP=pp_group), \
patch.dict("vllm_ascend.distributed.parallel_state.__dict__", _MC2=ep_group), \
patch("torch.npu.current_device", return_value=0):
yield
@pytest.fixture
def mock_forward_context():
forward_context = Mock(in_profile_run=False, with_prefill=False)
with patch("vllm_ascend.models.deepseek_v2.get_forward_context",
return_value=forward_context):
yield
def test_custom_deepseek_v2_silu_and_mul():
torch.set_default_device("cpu")
silu = CustomDeepseekV2SiluAndMul()
assert silu.weight_scale is None
x = torch.randn(2, 4)
output = silu.forward_oot(x)
assert output.shape == (2, 2)
weight_scale = Mock(return_value=torch.tensor(0.1))
silu = CustomDeepseekV2SiluAndMul(weight_scale=weight_scale)
quant_x = torch.randint(-128, 127, (2, 4), dtype=torch.int32)
dynamic_scale = torch.randn(2, 1)
with patch("torch_npu.npu_dequant_swiglu_quant",
return_value=torch.randn(2, 4)):
output = silu.forward_oot((quant_x, dynamic_scale))
assert output.shape == (2, 4)
def test_custom_deepseek_v2_merged_replicated_linear(mock_distributed):
linear = CustomDeepseekV2MergedReplicatedLinear(input_size=128,
output_sizes=[64, 64],
bias=False,
quant_config=None)
assert linear.output_sizes == [64, 64]
param = Mock()
param.data = torch.zeros(128, 128)
param.output_dim = 1
param.is_gguf_weight = False
param.is_gguf_weight_type = False
loaded_weight = torch.randn(128, 64)
linear.weight_loader(param, loaded_weight, loaded_shard_id=0)
with pytest.raises(AssertionError):
linear.weight_loader(param, torch.randn(128, 32), loaded_shard_id=0)
@pytest.mark.parametrize("cls", [
CustomDeepseekV2RowParallelLinearReplaceAllreduce,
CustomDeepseekV2RowParallelLinear
])
@pytest.mark.parametrize("cls", [CustomDeepseekV2RowParallelLinear])
def test_row_parallel_linear(cls, mock_distributed):
linear = cls(input_size=128, output_size=64, bias=False, quant_config=None)
linear.quant_method = Mock()
@@ -185,33 +41,6 @@ def test_row_parallel_linear(cls, mock_distributed):
assert output[0].shape == (2, 4, 64)
def test_custom_deepseek_v2_mlp(mock_distributed, base_config):
mlp = CustomDeepseekV2MLP(hidden_size=128,
intermediate_size=256,
hidden_act="silu",
quant_config=None)
assert isinstance(mlp.act_fn, CustomDeepseekV2SiluAndMul)
x = torch.randn(2, 4, 128)
output = mlp(x)
assert output.shape == (2, 4, 128)
with patch("vllm_ascend.models.deepseek_v2.QuantizationConfig"
) as mock_quant_config:
mock_quant_config.name = "w8a8dynamic"
with pytest.raises(NotImplementedError):
CustomDeepseekV2MLP(hidden_size=128,
intermediate_size=256,
hidden_act="silu",
quant_config=mock_quant_config,
force_replicate=False)
with pytest.raises(ValueError):
CustomDeepseekV2MLP(hidden_size=128,
intermediate_size=256,
hidden_act="relu",
quant_config=None)
@patch("torch_npu.npu_rms_norm")
def test_custom_deepseek_v2_mla_attention(mock_rms_norm, mock_distributed,
base_config):

View File

@@ -75,7 +75,7 @@ class TestLoadWeight(TestBase):
with patch.object(AscendFusedMoE, "__init__",
lambda self, *args, **kwargs: None):
moe = AscendFusedMoE(num_experts=4, top_k=2, hidden_size=8)
moe.hidden_size = 8
expert_data = torch.randn(128, 8)
loaded_weight = torch.randn(128, 4)
moe._load_w13(expert_data, 1, "w1", loaded_weight, 0)

View File

@@ -36,6 +36,7 @@ class TestNPUPlatform(TestBase):
mock_ascend_config = MagicMock()
mock_ascend_config.torchair_graph_config.enabled = False
mock_ascend_config.ascend_scheduler_config.enabled = False
mock_ascend_config.enable_shared_expert_dp = False
return mock_ascend_config
def setUp(self):
@@ -479,6 +480,7 @@ class TestNPUPlatform(TestBase):
def test_get_attn_backend_cls_use_v1_and_mla(self, mock_get_ascend_config):
mock_config = MagicMock()
mock_config.torchair_graph_config.enabled = False
mock_config.enable_shared_expert_dp = False
mock_get_ascend_config.return_value = mock_config

View File

@@ -4,10 +4,6 @@ import vllm_ascend.envs as envs_ascend
def register_model():
ModelRegistry.register_model(
"DeepSeekMTPModel",
"vllm_ascend.models.deepseek_mtp:CustomDeepSeekMTP")
ModelRegistry.register_model(
"Qwen2VLForConditionalGeneration",
"vllm_ascend.models.qwen2_vl:AscendQwen2VLForConditionalGeneration")
@@ -23,22 +19,17 @@ def register_model():
"vllm_ascend.models.qwen2_5_vl_without_padding:AscendQwen2_5_VLForConditionalGeneration_Without_Padding"
)
if envs_ascend.VLLM_ASCEND_ENABLE_DBO:
ModelRegistry.register_model(
"DeepseekV2ForCausalLM",
"vllm_ascend.models.deepseek_dbo:CustomDeepseekDBOForCausalLM")
ModelRegistry.register_model(
"DeepseekV2ForCausalLM",
"vllm_ascend.models.deepseek_v2:CustomDeepseekV2ForCausalLM")
ModelRegistry.register_model(
"DeepseekV3ForCausalLM",
"vllm_ascend.models.deepseek_dbo:CustomDeepseekDBOForCausalLM")
else:
ModelRegistry.register_model(
"DeepseekV2ForCausalLM",
"vllm_ascend.models.deepseek_v2:CustomDeepseekV2ForCausalLM")
ModelRegistry.register_model(
"DeepseekV3ForCausalLM",
"vllm_ascend.models.deepseek_v3:CustomDeepseekV3ForCausalLM")
ModelRegistry.register_model(
"DeepseekV3ForCausalLM",
"vllm_ascend.models.deepseek_v3:CustomDeepseekV3ForCausalLM")
ModelRegistry.register_model(
"DeepSeekMTPModel",
"vllm_ascend.models.deepseek_mtp:CustomDeepSeekMTP")
ModelRegistry.register_model(
"Qwen3MoeForCausalLM",

File diff suppressed because it is too large Load Diff

View File

@@ -23,7 +23,8 @@ import torch
import torch.nn as nn
from transformers import PretrainedConfig
from vllm.attention.backends.abstract import AttentionMetadata
from vllm.config import CacheConfig, ModelConfig, VllmConfig
from vllm.config import (CacheConfig, ModelConfig, VllmConfig,
get_current_vllm_config)
from vllm.model_executor.layers.layernorm import RMSNorm
from vllm.model_executor.layers.logits_processor import LogitsProcessor
from vllm.model_executor.layers.quantization import QuantizationConfig
@@ -33,12 +34,11 @@ from vllm.model_executor.layers.vocab_parallel_embedding import (
from vllm.model_executor.models.deepseek_mtp import (
DeepSeekMTP, DeepSeekMultiTokenPredictor, DeepSeekMultiTokenPredictorLayer,
SharedHead)
from vllm.model_executor.models.deepseek_v2 import DeepseekV2DecoderLayer
from vllm.model_executor.models.utils import maybe_prefix
from vllm.model_executor.sampling_metadata import SamplingMetadata
from vllm.sequence import IntermediateTensors
from .deepseek_v2 import CustomDeepseekV2DecoderLayer
class CustomDeepSeekShareHead(SharedHead):
@@ -65,6 +65,7 @@ class CustomDeepSeekMultiTokenPredictorLayer(DeepSeekMultiTokenPredictorLayer):
quant_config: Optional[QuantizationConfig] = None,
) -> None:
nn.Module.__init__(self)
vllm_config = get_current_vllm_config()
self.enorm = RMSNorm(config.hidden_size, eps=config.rms_norm_eps)
self.hnorm = RMSNorm(config.hidden_size, eps=config.rms_norm_eps)
@@ -75,10 +76,8 @@ class CustomDeepSeekMultiTokenPredictorLayer(DeepSeekMultiTokenPredictorLayer):
quant_config=quant_config,
prefix=maybe_prefix(
prefix, "shared_head"))
self.mtp_block = CustomDeepseekV2DecoderLayer(config, prefix,
model_config,
cache_config,
quant_config)
self.mtp_block = DeepseekV2DecoderLayer(vllm_config=vllm_config,
prefix=prefix)
def forward(
self,
@@ -103,8 +102,6 @@ class CustomDeepSeekMultiTokenPredictorLayer(DeepSeekMultiTokenPredictorLayer):
hidden_states, residual = self.mtp_block(positions=positions,
hidden_states=hidden_states,
kv_cache=kv_cache,
attn_metadata=attn_metadata,
residual=None)
hidden_states = residual + hidden_states
return hidden_states

View File

@@ -25,161 +25,42 @@
# # vllm-project/vllm/vllm/model_executor/models/deepseek_v2.py
# """Inference-only DeepseekV2/DeepseekV3 model."""
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union
from typing import Any, Dict, Iterable, Optional, Union
import torch
import torch_npu
from torch import nn
from transformers import PretrainedConfig
from vllm.attention import AttentionMetadata
from vllm.config import CacheConfig, ModelConfig, VllmConfig
from vllm.config import CacheConfig, VllmConfig
from vllm.distributed import (get_pp_group, get_tensor_model_parallel_rank,
get_tensor_model_parallel_world_size,
get_tp_group, split_tensor_along_last_dim,
tensor_model_parallel_all_reduce,
tensor_model_parallel_reduce_scatter)
from vllm.distributed.parallel_state import get_dp_group, get_ep_group
from vllm.forward_context import get_forward_context
from vllm.model_executor.layers.activation import SiluAndMul
tensor_model_parallel_all_reduce)
from vllm.model_executor.layers.fused_moe import FusedMoE
from vllm.model_executor.layers.layernorm import RMSNorm
from vllm.model_executor.layers.linear import (ColumnParallelLinear,
MergedColumnParallelLinear,
ReplicatedLinear,
RowParallelLinear,
UnquantizedLinearMethod)
RowParallelLinear)
from vllm.model_executor.layers.logits_processor import LogitsProcessor
from vllm.model_executor.layers.mla import MultiHeadLatentAttention
from vllm.model_executor.layers.quantization import QuantizationConfig
from vllm.model_executor.layers.rotary_embedding import get_rope
from vllm.model_executor.layers.sampler import get_sampler
from vllm.model_executor.layers.vocab_parallel_embedding import (
ParallelLMHead, VocabParallelEmbedding)
from vllm.model_executor.layers.vocab_parallel_embedding import ParallelLMHead
from vllm.model_executor.model_loader.weight_utils import (
default_weight_loader, maybe_remap_kv_scale_name)
from vllm.model_executor.models.deepseek_v2 import \
DeepseekV2ForCausalLM # noqa: E501
from vllm.model_executor.models.deepseek_v2 import \
yarn_get_mscale # noqa: E501
from vllm.model_executor.models.deepseek_v2 import (
DeepseekV2Attention, DeepseekV2DecoderLayer, DeepseekV2MLAAttention,
from vllm.model_executor.models.deepseek_v2 import ( # noqa: E501
DeepseekV2Attention, DeepseekV2DecoderLayer, DeepseekV2ForCausalLM,
DeepseekV2MLAAttention, DeepseekV2MLP, DeepseekV2Model, DeepseekV2MoE,
get_spec_layer_idx_from_weight_name)
from vllm.model_executor.models.utils import (
PPMissingLayer, is_pp_missing_parameter,
make_empty_intermediate_tensors_factory, make_layers, maybe_prefix)
from vllm.sequence import IntermediateTensors
from vllm.model_executor.models.utils import (PPMissingLayer,
is_pp_missing_parameter,
maybe_prefix)
from vllm_ascend.ascend_config import get_ascend_config
from vllm_ascend.models.layers.mla import AscendMLAModules
from vllm_ascend.ops.fused_moe import AscendFusedMoE
from vllm_ascend.quantization.quant_config import AscendLinearMethod
from vllm_ascend.quantization.w8a8_dynamic import AscendW8A8DynamicLinearMethod
from vllm_ascend.utils import dispose_tensor
class CustomDeepseekV2SiluAndMul(SiluAndMul):
def __init__(self,
*,
weight_scale: Optional[Callable[[], torch.Tensor]] = None):
super().__init__()
self.weight_scale = weight_scale
def forward_oot(self, x: Union[torch.Tensor, Tuple[torch.Tensor,
torch.Tensor]]):
if isinstance(x, tuple):
assert self.weight_scale is not None
# For AscendW8A8DynamicLinearMethod:
# a dynamic scale is passed along with the quantized value.
quantized_x, dynamic_scale = x
return torch_npu.npu_dequant_swiglu_quant(
x=quantized_x,
weight_scale=self.weight_scale(),
activation_scale=dynamic_scale,
activate_left=True,
quant_mode=1)
else:
return super().forward_oot(x)
class CustomDeepseekV2MergedReplicatedLinear(ReplicatedLinear):
def __init__(
self,
input_size: int,
output_sizes: list[int],
bias: bool = True,
quant_config: Optional[QuantizationConfig] = None,
prefix: str = "",
):
self.output_sizes = output_sizes
super().__init__(input_size,
sum(output_sizes),
bias=bias,
quant_config=quant_config,
prefix=prefix)
def weight_loader(self, param: torch.nn.Parameter,
loaded_weight: torch.Tensor, loaded_shard_id: int):
# With no support for GGUF format yet.
assert not getattr(param, "is_gguf_weight", False)
assert not getattr(param, "is_gguf_weight_type", False)
assert loaded_shard_id < len(self.output_sizes)
shard_offset = sum(self.output_sizes[:loaded_shard_id])
shard_size = self.output_sizes[loaded_shard_id]
shard = param.data.narrow(param.output_dim, shard_offset, shard_size)
assert shard.size() == loaded_weight.size(), (
f"Tried to load weights of size {loaded_weight.size()}"
f"to a parameter shard of id {loaded_shard_id} size {shard.size()}"
)
shard.copy_(loaded_weight)
class CustomDeepseekV2RowParallelLinearReplaceAllreduce(RowParallelLinear):
def forward(
self,
input_,
is_prefill=True,
is_force_scatter=False
) -> Union[torch.Tensor, tuple[torch.Tensor, Optional[nn.Parameter]]]:
if self.input_is_parallel:
input_parallel = input_
else:
tp_rank = get_tensor_model_parallel_rank()
splitted_input = split_tensor_along_last_dim(
input_, num_partitions=self.tp_size)
input_parallel = splitted_input[tp_rank].contiguous()
# Matrix multiply.
assert self.quant_method is not None
# Only fuse bias add into GEMM for rank 0 (this ensures that
# bias will not get added more than once in TP>1 case)
bias_ = None if (self.tp_rank > 0 or self.skip_bias_add) else self.bias
output_parallel = self.quant_method.apply(self,
input_parallel,
bias=bias_)
if self.reduce_results and self.tp_size > 1:
num_tokens = output_parallel.shape[0]
if is_force_scatter and num_tokens % self.tp_size:
output_parallel = nn.functional.pad(
output_parallel, (0, 0, 0, -num_tokens % self.tp_size))
if is_force_scatter or (not is_prefill
and output_parallel.shape[0] % self.tp_size
== 0):
output = tensor_model_parallel_reduce_scatter(output_parallel,
dim=0)
else:
output = tensor_model_parallel_all_reduce(output_parallel)
else:
output = output_parallel
output_bias = self.bias if self.skip_bias_add else None
if not self.return_bias:
return output
return output, output_bias
class CustomDeepseekV2RowParallelLinear(RowParallelLinear):
@@ -218,205 +99,6 @@ class CustomDeepseekV2RowParallelLinear(RowParallelLinear):
return output, output_bias
class CustomDeepseekV2MLP(nn.Module):
def __init__(
self,
hidden_size: int,
intermediate_size: int,
hidden_act: str,
quant_config: Optional[QuantizationConfig] = None,
reduce_results: bool = True,
force_replicate: bool = False,
prefix: str = "",
) -> None:
super().__init__()
if not force_replicate:
self.gate_up_proj = MergedColumnParallelLinear(
hidden_size, [intermediate_size] * 2,
bias=False,
quant_config=quant_config,
prefix=f"{prefix}.gate_up_proj")
self.down_proj = RowParallelLinear(intermediate_size,
hidden_size,
bias=False,
quant_config=quant_config,
reduce_results=reduce_results,
prefix=f"{prefix}.down_proj")
else:
self.gate_up_proj = CustomDeepseekV2MergedReplicatedLinear(
hidden_size, [intermediate_size] * 2,
bias=False,
quant_config=quant_config,
prefix=f"{prefix}.gate_up_proj")
self.down_proj = ReplicatedLinear(intermediate_size,
hidden_size,
bias=False,
quant_config=quant_config,
prefix=f"{prefix}.down_proj")
if hidden_act != "silu":
raise ValueError(f"Unsupported activation: {hidden_act}. "
"Only silu is supported for now.")
quant_method = self.gate_up_proj.quant_method
if isinstance(quant_method, UnquantizedLinearMethod):
self.act_fn = CustomDeepseekV2SiluAndMul()
elif (isinstance(quant_method, AscendLinearMethod) and isinstance(
quant_method.quant_method, AscendW8A8DynamicLinearMethod)):
# TODO(sdmyzlp): Currently preserved as before:
# 1. The only quantization supported for silu is W8A8Dynamic
# 2. Output dtype of gate_up/down is fixed to be int32/bfloat16
#
# Maybe one can implement a better and more general configuration
# scheme, e.g. by somehow passing around the tweaked `quant_config`
self.act_fn = CustomDeepseekV2SiluAndMul(
# Use lazy binding, for `weight_scale_fp32` is accessible
# only after `process_weights_after_loading`.
weight_scale=lambda: self.gate_up_proj.weight_scale_fp32)
# To be consumed by AscendW8A8DynamicLinearMethod.apply()
self.gate_up_proj._ascend_quant_config = {
"output_dtype": torch.int32,
"pertoken_scale": False,
"return_scale": True,
}
self.down_proj._ascend_quant_config = {
"output_dtype": torch.bfloat16,
"pertoken_scale": True,
"return_scale": False,
}
else:
raise NotImplementedError(
f"Quantization with [{type(quant_method)}] is NOT supported")
def forward(self, x):
gate_up, _ = self.gate_up_proj(x)
x = self.act_fn(gate_up)
x, _ = self.down_proj(x)
return x
class CustomDeepseekV2MoE(nn.Module):
top_k: int
def __init__(
self,
config: PretrainedConfig,
quant_config: Optional[QuantizationConfig] = None,
prefix: str = "",
):
super().__init__()
self.tp_size = get_tensor_model_parallel_world_size()
self.routed_scaling_factor = config.routed_scaling_factor
self.n_shared_experts = config.n_shared_experts
if self.tp_size > config.n_routed_experts:
raise ValueError(
f"Tensor parallel size {self.tp_size} is greater than "
f"the number of experts {config.n_routed_experts}.")
if config.hidden_act != "silu":
raise ValueError(f"Unsupported activation: {config.hidden_act}. "
"Only silu is supported for now.")
ascend_config = get_ascend_config()
self.torchair_graph_enabled = ascend_config.torchair_graph_config.enabled
self.enable_multistream_moe = \
ascend_config.torchair_graph_config.enable_multistream_moe and \
self.torchair_graph_enabled
self.gate = ReplicatedLinear(config.hidden_size,
config.n_routed_experts,
bias=False,
quant_config=None,
prefix=f"{prefix}.gate")
if config.topk_method == "noaux_tc":
self.gate.e_score_correction_bias = nn.Parameter(
torch.empty(config.n_routed_experts))
else:
self.gate.e_score_correction_bias = None
self.experts = AscendFusedMoE(
num_experts=config.n_routed_experts,
top_k=config.num_experts_per_tok,
hidden_size=config.hidden_size,
intermediate_size=config.moe_intermediate_size,
reduce_results=False,
renormalize=config.norm_topk_prob,
quant_config=quant_config,
use_grouped_topk=True,
num_expert_group=config.n_group,
topk_group=config.topk_group,
prefix=f"{prefix}.experts",
scoring_func=config.scoring_func,
e_score_correction_bias=self.gate.e_score_correction_bias)
if config.n_shared_experts is not None:
self.all_reduce_merge = self.experts.all_reduce_merge
reduce_results = not self.all_reduce_merge
intermediate_size = (config.moe_intermediate_size *
config.n_shared_experts)
enable_shared_expert_dp = ascend_config.enable_shared_expert_dp
self.shared_experts = CustomDeepseekV2MLP(
hidden_size=config.hidden_size,
intermediate_size=intermediate_size,
hidden_act=config.hidden_act,
quant_config=quant_config,
reduce_results=reduce_results,
force_replicate=self.enable_multistream_moe
or enable_shared_expert_dp,
prefix=f"{prefix}.shared_experts",
)
else:
self.shared_experts = None # type: ignore
CustomDeepseekV2MoE.top_k = config.num_experts_per_tok
self.dp_size = get_dp_group().world_size
self.tp_group = get_tp_group().device_group
self.tp_rank = get_tp_group().rank_in_group
self.ep_group = get_ep_group()
self.params_dtype = torch.get_default_dtype()
self.rm_router_logits = self.experts.rm_router_logits
def forward(self,
hidden_states: torch.Tensor,
attn_metadata: Optional[AttentionMetadata] = None,
replace_allreduce: bool = False) -> torch.Tensor:
forward_context = get_forward_context()
# when profile runs, force experts to load balanced tokens
# to avoid high memory consumption on a single rank.
enable_force_load_balance = forward_context.in_profile_run
is_prefill = forward_context.with_prefill
# router_logits: (num_tokens, n_experts)
router_logits = None
if not self.rm_router_logits and not self.enable_multistream_moe:
router_logits, _ = self.gate(hidden_states)
experts_hidden_states = self.experts(
hidden_states=hidden_states,
router_logits=router_logits,
is_prefill=is_prefill,
top_k=CustomDeepseekV2MoE.top_k,
enable_force_load_balance=enable_force_load_balance,
shared_experts=self.shared_experts,
gate=self.gate,
replace_allreduce=replace_allreduce)
hidden_states = (
experts_hidden_states[0] * self.routed_scaling_factor +
experts_hidden_states[1])
if self.all_reduce_merge:
# When all_reduce_merge is in progress, shared_experts does not do all_reduce in mlp, but waits until shared_experts+router_experts are completed before doing all_reduce
hidden_states = tensor_model_parallel_all_reduce(hidden_states)
return hidden_states
class CustomDeepseekV2MLAAttention(DeepseekV2MLAAttention):
def __init__(
@@ -499,23 +181,12 @@ class CustomDeepseekV2MLAAttention(DeepseekV2MLAAttention):
bias=False,
quant_config=quant_config,
prefix=f"{prefix}.kv_b_proj")
if (config.n_routed_experts is not None
and self.debug_layer_idx >= config.first_k_dense_replace
and self.debug_layer_idx % config.moe_layer_freq == 0
and self.enable_shared_expert_dp):
self.o_proj = CustomDeepseekV2RowParallelLinearReplaceAllreduce(
self.num_heads * self.v_head_dim,
self.hidden_size,
bias=False,
quant_config=quant_config,
prefix=f"{prefix}.o_proj")
else:
self.o_proj = CustomDeepseekV2RowParallelLinear(
self.num_heads * self.v_head_dim,
self.hidden_size,
bias=False,
quant_config=quant_config,
prefix=f"{prefix}.o_proj")
self.o_proj = CustomDeepseekV2RowParallelLinear(
self.num_heads * self.v_head_dim,
self.hidden_size,
bias=False,
quant_config=quant_config,
prefix=f"{prefix}.o_proj")
if rope_scaling:
rope_scaling["rope_type"] = 'deepseek_yarn'
@@ -575,15 +246,14 @@ class CustomDeepseekV2MLAAttention(DeepseekV2MLAAttention):
class CustomDeepseekV2DecoderLayer(DeepseekV2DecoderLayer):
def __init__(
self,
config: PretrainedConfig,
prefix: str,
model_config: ModelConfig,
cache_config: Optional[CacheConfig] = None,
quant_config: Optional[QuantizationConfig] = None,
) -> None:
def __init__(self, vllm_config: VllmConfig, prefix: str) -> None:
nn.Module.__init__(self)
config = vllm_config.model_config.hf_config
model_config = vllm_config.model_config
cache_config = vllm_config.cache_config
quant_config = vllm_config.quant_config
parallel_config = vllm_config.parallel_config
self.hidden_size = config.hidden_size
rope_theta = getattr(config, "rope_theta", 10000)
rope_scaling = getattr(config, "rope_scaling", None)
@@ -596,7 +266,6 @@ class CustomDeepseekV2DecoderLayer(DeepseekV2DecoderLayer):
self.layers = config.num_hidden_layers
self.tp_size = get_tensor_model_parallel_world_size()
self.tp_rank = get_tp_group().rank_in_group
ascend_config = get_ascend_config()
# TODO: enable mla in vllm-ascend
if model_config.use_mla:
attn_cls = CustomDeepseekV2MLAAttention
@@ -623,13 +292,18 @@ class CustomDeepseekV2DecoderLayer(DeepseekV2DecoderLayer):
if (config.n_routed_experts is not None
and layer_idx >= config.first_k_dense_replace
and layer_idx % config.moe_layer_freq == 0):
self.mlp = CustomDeepseekV2MoE(
self.mlp = DeepseekV2MoE(
config=config,
parallel_config=parallel_config,
quant_config=quant_config,
prefix=f"{prefix}.mlp",
)
if self.mlp.gate.e_score_correction_bias is not None:
self.mlp.gate.e_score_correction_bias.data = (
self.mlp.gate.e_score_correction_bias.data.to(
dtype=torch.get_default_dtype()))
else:
self.mlp = CustomDeepseekV2MLP(
self.mlp = DeepseekV2MLP(
hidden_size=config.hidden_size,
intermediate_size=config.intermediate_size,
hidden_act=config.hidden_act,
@@ -643,185 +317,6 @@ class CustomDeepseekV2DecoderLayer(DeepseekV2DecoderLayer):
self.routed_scaling_factor = config.routed_scaling_factor
self.first_k_dense_replace = config.first_k_dense_replace
self.tp_group = get_tp_group().device_group
self.enable_shared_expert_dp = ascend_config.enable_shared_expert_dp
def forward(
self,
positions: torch.Tensor,
hidden_states: torch.Tensor,
residual: Optional[torch.Tensor],
kv_cache: Optional[torch.Tensor] = None,
attn_metadata: Optional[AttentionMetadata] = None,
replace_allreduce: bool = False,
) -> torch.Tensor:
# Self Attention
if residual is None:
residual = hidden_states
hidden_states = self.input_layernorm(hidden_states)
else:
previous_hidden_states, previous_residual = hidden_states, residual
hidden_states, residual = self.input_layernorm(
hidden_states, residual)
# Dispose hidden_states and residual from the previous layer
# to save npu memory because they're no longer used.
dispose_tensor(previous_hidden_states)
dispose_tensor(previous_residual)
hidden_states = self.self_attn(
positions=positions,
hidden_states=hidden_states,
kv_cache=kv_cache,
attn_metadata=attn_metadata,
)
if hidden_states.dtype == torch.float16:
# Fix FP16 overflow
# We scale both hidden_states and residual before
# rmsnorm, and rmsnorm result would not affect by scale.
hidden_states *= 1. / self.routed_scaling_factor
if self.layer_idx == 0:
# The residual is shared by all layers, we only scale it on
# first layer.
residual *= 1. / self.routed_scaling_factor
tp_size = get_tensor_model_parallel_world_size()
if self.enable_shared_expert_dp and (
self.layer_idx == self.first_k_dense_replace
or self.layer_idx == self.layers) and tp_size > 1:
num_tokens, _ = residual.shape
if num_tokens % tp_size:
residual = nn.functional.pad(residual,
(0, 0, 0, -num_tokens % tp_size))
chunk_residual = torch.tensor_split(residual, tp_size, dim=0)
tp_rank = get_tensor_model_parallel_rank()
residual = chunk_residual[tp_rank]
# Fully Connected
hidden_states, residual = self.post_attention_layernorm(
hidden_states, residual)
if isinstance(self.mlp, CustomDeepseekV2MoE):
hidden_states = self.mlp(hidden_states, attn_metadata)
else:
hidden_states = self.mlp(hidden_states)
if isinstance(
self.mlp,
CustomDeepseekV2MLP) and hidden_states.dtype == torch.float16:
# Fix FP16 overflow
# Scaling the DeepseekV2MLP output, it is the input of
# input_layernorm of next decoder layer.
# The scaling of DeepseekV2MOE output would be done in the forward
# of DeepseekV2MOE
hidden_states *= 1. / self.routed_scaling_factor
# for last layer of main model and mtp layer.
if self.enable_shared_expert_dp and self.layer_idx >= (
self.layers - 1) and tp_size > 1:
hidden_states = get_tp_group().all_gather(hidden_states, 0)
residual = get_tp_group().all_gather(residual, 0)
attn_metadata = get_forward_context().attn_metadata
if attn_metadata is not None:
num_tokens = attn_metadata.num_actual_tokens
else:
num_tokens = hidden_states.shape[0]
if num_tokens < hidden_states.shape[0]:
hidden_states = hidden_states[:num_tokens]
residual = residual[:num_tokens]
return hidden_states, residual
class CustomDeepseekV2Model(nn.Module):
fall_back_to_pt_during_load = False
def __init__(self, *, vllm_config: VllmConfig, prefix: str = ""):
super().__init__()
config = vllm_config.model_config.hf_config
model_config = vllm_config.model_config
cache_config = vllm_config.cache_config
quant_config = vllm_config.quant_config
self.padding_idx = config.pad_token_id
self.vocab_size = config.vocab_size
self.tp_size = get_tensor_model_parallel_world_size()
if get_pp_group().is_first_rank:
self.embed_tokens = VocabParallelEmbedding(
config.vocab_size,
config.hidden_size,
quant_config=quant_config,
prefix=f"{prefix}.embed_tokens")
else:
self.embed_tokens = PPMissingLayer()
self.start_layer, self.end_layer, self.layers = make_layers(
config.num_hidden_layers,
lambda prefix: CustomDeepseekV2DecoderLayer(
config,
prefix,
model_config=model_config,
cache_config=cache_config,
quant_config=quant_config,
),
prefix=f"{prefix}.layers")
if get_pp_group().is_last_rank:
self.norm = RMSNorm(config.hidden_size, eps=config.rms_norm_eps)
else:
self.norm = PPMissingLayer()
self.make_empty_intermediate_tensors = (
make_empty_intermediate_tensors_factory(
["hidden_states", "residual"], config.hidden_size))
def get_input_embeddings(self, input_ids: torch.Tensor) -> torch.Tensor:
return self.embed_tokens(input_ids)
def forward(
self,
input_ids: torch.Tensor,
positions: torch.Tensor,
kv_caches: Optional[List[torch.Tensor]] = None,
attn_metadata: Optional[AttentionMetadata] = None,
intermediate_tensors: Optional[IntermediateTensors] = None,
inputs_embeds: Optional[torch.Tensor] = None,
) -> Union[torch.Tensor, IntermediateTensors]:
if get_pp_group().is_first_rank:
if inputs_embeds is not None:
hidden_states = inputs_embeds
else:
hidden_states = self.get_input_embeddings(input_ids)
residual = None
else:
assert intermediate_tensors is not None
hidden_states = intermediate_tensors["hidden_states"]
residual = intermediate_tensors["residual"]
replace_allreduce = hidden_states.shape[0] % self.tp_size == 0
for i in range(self.start_layer, self.end_layer):
layer = self.layers[i]
hidden_states, residual = layer(
positions,
hidden_states,
residual,
kv_caches[i -
self.start_layer] if kv_caches is not None else None,
attn_metadata,
replace_allreduce=replace_allreduce)
if not get_pp_group().is_last_rank:
return IntermediateTensors({
"hidden_states": hidden_states,
"residual": residual
})
hidden_states, _ = self.norm(hidden_states, residual)
return hidden_states
class CustomDeepseekV2ForCausalLM(DeepseekV2ForCausalLM):
@@ -838,9 +333,21 @@ class CustomDeepseekV2ForCausalLM(DeepseekV2ForCausalLM):
quant_config = vllm_config.quant_config
self.config = config
self.quant_config = quant_config
self.model = CustomDeepseekV2Model(vllm_config=vllm_config,
prefix=maybe_prefix(
prefix, "model"))
# `packed_modules_mapping` needs to be modified before
# initializing DeepseekV2Model, as it is passed inplace to
# quantization config init and may be used to select the
# quant_method for relevant layers during initialization.
self.fuse_qkv_a_proj = hasattr(
config, "q_lora_rank") and config.q_lora_rank is not None
if self.fuse_qkv_a_proj:
self.packed_modules_mapping["fused_qkv_a_proj"] = [
"q_a_proj",
"kv_a_proj_with_mqa",
]
self.model = DeepseekV2Model(vllm_config=vllm_config,
prefix=maybe_prefix(prefix, "model"))
if get_pp_group().is_last_rank:
self.lm_head = ParallelLMHead(config.vocab_size,
config.hidden_size,
@@ -850,9 +357,36 @@ class CustomDeepseekV2ForCausalLM(DeepseekV2ForCausalLM):
else:
self.lm_head = PPMissingLayer()
self.logits_processor = LogitsProcessor(config.vocab_size)
self.sampler = get_sampler()
self.make_empty_intermediate_tensors = (
self.model.make_empty_intermediate_tensors)
self.expert_weights: list[Any] = []
# Set MoE hyperparameters
self.num_moe_layers = (config.num_hidden_layers -
config.first_k_dense_replace)
self.num_expert_groups = config.n_group
self.moe_layers: list[FusedMoE] = []
example_moe = None
for layer in self.model.layers:
if isinstance(layer, PPMissingLayer):
continue
assert isinstance(layer, DeepseekV2DecoderLayer)
if isinstance(layer.mlp, DeepseekV2MoE):
# Pick last one layer since the first ones may be dense layers.
example_moe = layer.mlp
self.moe_layers.append(layer.mlp.experts)
if example_moe is None:
raise RuntimeError("No DeepseekV2MoE layer found in model.layers.")
self.num_logical_experts = example_moe.n_logical_experts
self.num_physical_experts = example_moe.n_physical_experts
self.num_local_physical_experts = example_moe.n_local_physical_experts
self.num_routed_experts = example_moe.n_routed_experts
self.num_shared_experts = example_moe.n_shared_experts
self.num_redundant_experts = example_moe.n_redundant_experts
# NOTE: This `load_weights` is mainly copied from
# https://github.com/vllm-project/vllm/commit/07b8fae219b1fff51ef115c38c44b51395be5bb5
@@ -950,16 +484,5 @@ class CustomDeepseekV2ForCausalLM(DeepseekV2ForCausalLM):
loaded_params.add(name)
return loaded_params
def forward(
self,
input_ids: torch.Tensor,
positions: torch.Tensor,
kv_caches: Optional[List[torch.Tensor]] = None,
attn_metadata: Optional[AttentionMetadata] = None,
intermediate_tensors: Optional[IntermediateTensors] = None,
inputs_embeds: Optional[torch.Tensor] = None,
) -> Union[torch.Tensor, IntermediateTensors]:
hidden_states = self.model(input_ids, positions, kv_caches,
attn_metadata, intermediate_tensors,
inputs_embeds)
return hidden_states
DeepseekV2DecoderLayer.__init__ = CustomDeepseekV2DecoderLayer.__init__

View File

@@ -227,61 +227,9 @@ def process_weights_after_loading(self, layer):
class AscendFusedMoE(FusedMoE):
def __init__(
self,
num_experts,
top_k,
hidden_size,
intermediate_size,
params_dtype=None,
reduce_results=False,
renormalize=True,
use_grouped_topk=False,
num_expert_group=None,
topk_group=None,
quant_config=None,
tp_size=None,
ep_size=None,
dp_size=None,
prefix="",
custom_routing_function=None,
scoring_func="softmax",
routed_scaling_factor: float = 1.0,
e_score_correction_bias=None,
apply_router_weight_on_input=False,
activation="silu",
enable_eplb=False,
num_redundant_experts=0,
has_bias=False,
):
super().__init__(
num_experts,
top_k,
hidden_size,
intermediate_size,
params_dtype,
reduce_results,
renormalize,
use_grouped_topk,
num_expert_group,
topk_group,
quant_config,
tp_size,
ep_size,
dp_size,
prefix,
custom_routing_function,
scoring_func,
routed_scaling_factor,
e_score_correction_bias,
apply_router_weight_on_input,
activation,
enable_eplb,
num_redundant_experts,
has_bias,
)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.hidden_size = hidden_size
self.moe_config.tp_group = get_tp_group()
self.moe_config.dp_group = get_dp_group()
self.moe_config.ep_group = get_ep_group()

View File

@@ -17,4 +17,3 @@
import vllm_ascend.patch.platform.patch_common.patch_distributed # noqa
import vllm_ascend.patch.platform.patch_common.patch_mamba_config # noqa
import vllm_ascend.patch.platform.patch_common.patch_shared_fused_moe # noqa

View File

@@ -18,3 +18,4 @@
import vllm_ascend.patch.worker.patch_common.patch_distributed # noqa
import vllm_ascend.patch.worker.patch_common.patch_logits # noqa
import vllm_ascend.patch.worker.patch_common.patch_minicpm # noqa
import vllm_ascend.patch.worker.patch_common.patch_shared_fused_moe # noqa

View File

@@ -238,7 +238,7 @@ class NPUPlatform(Platform):
compilation_config.level = CompilationLevel.NO_COMPILATION
if parallel_config and parallel_config.worker_cls == "auto":
if ascend_config.torchair_graph_config.enabled:
if ascend_config.torchair_graph_config.enabled or ascend_config.enable_shared_expert_dp:
parallel_config.worker_cls = "vllm_ascend.torchair.torchair_worker.NPUTorchairWorker"
else:
parallel_config.worker_cls = "vllm_ascend.worker.worker_v1.NPUWorker"
@@ -289,7 +289,12 @@ class NPUPlatform(Platform):
if not use_v1:
raise ValueError("vLLM Ascend does not support V0 engine.")
use_torchair = get_ascend_config().torchair_graph_config.enabled
ascend_config = get_ascend_config()
if use_mla and ascend_config.enable_shared_expert_dp:
return "vllm_ascend.torchair.torchair_mla.AscendMLATorchairBackend"
use_torchair = ascend_config.torchair_graph_config.enabled
# choose attention backend based on use_mla and use_torchair
backend_map = {
(True, True):

View File

@@ -49,11 +49,17 @@ from vllm_ascend.worker.model_runner_v1 import NPUModelRunner
class NPUTorchairModelRunner(NPUModelRunner):
def __init__(self, vllm_config: VllmConfig, device: torch.device):
ascend_config = get_ascend_config()
self.enable_shared_expert_dp = ascend_config.enable_shared_expert_dp
super().__init__(vllm_config, device)
self.attn_metadata_builder = self.attn_backend.get_builder_cls()(
None, None, vllm_config, device)
ascend_config = get_ascend_config()
register_torchair_model()
torchair_ops_patch()
torchair_quant_method_register()
if self.enable_shared_expert_dp:
return
self.new_kv_cache_bytes = -1
self.torchair_compiled_model = None # type: ignore
self.torchair_compiled_models = {} # type: ignore
@@ -72,14 +78,14 @@ class NPUTorchairModelRunner(NPUModelRunner):
recompiles=envs_ascend.VLLM_ASCEND_TRACE_RECOMPILES)
self._check_batch_sizes_consistency()
register_torchair_model()
torchair_ops_patch()
torchair_quant_method_register()
def _sync_metadata_across_dp(
self, num_tokens: int, with_prefill: bool, enable_dbo: bool
) -> tuple[int, Optional[torch.Tensor], bool, bool]:
"""Override from NPUModelRunner to pad num_tokens"""
if self.enable_shared_expert_dp:
return super()._sync_metadata_across_dp(num_tokens, with_prefill,
enable_dbo)
if self.dp_size == 1:
if not with_prefill:
maybe_padded_num_tokens = self.select_torchair_padded_batch_size(
@@ -115,7 +121,10 @@ class NPUTorchairModelRunner(NPUModelRunner):
def _build_attention_metadata(self, with_prefill, num_reqs, skip_attn):
# NOTE: If torchair graph mode and not with_prefill,
# we can't skip_attn, it will cause graph recompile.
if not with_prefill:
if with_prefill or self.enable_shared_expert_dp:
attn_metadata = super()._build_attention_metadata(
with_prefill, num_reqs, skip_attn)
else:
common_attn_metadata = TorchairCommonAttentionMetadata(
num_reqs=num_reqs,
num_actual_tokens=1,
@@ -126,17 +135,19 @@ class NPUTorchairModelRunner(NPUModelRunner):
)
attn_metadata = self.attn_metadata_builder.build_torchair_graph_dummy(
common_attn_metadata)
else:
attn_metadata = super()._build_attention_metadata(
with_prefill, num_reqs, skip_attn)
return attn_metadata
def _generate_dummy_run_hidden_states(self, with_prefill,
is_torchair_compile, input_ids,
positions, attn_metadata, num_tokens,
intermediate_tensors, inputs_embeds):
if not with_prefill:
if with_prefill or self.enable_shared_expert_dp:
if is_310p():
converting_weight_acl_format(self.model, ACL_FORMAT_FRACTAL_ND)
hidden_states = super()._generate_dummy_run_hidden_states(
with_prefill, is_torchair_compile, input_ids, positions,
attn_metadata, num_tokens, intermediate_tensors, inputs_embeds)
else:
# Only mark static while compiling
if is_torchair_compile:
torch._dynamo.mark_static(input_ids)
@@ -168,15 +179,11 @@ class NPUTorchairModelRunner(NPUModelRunner):
inputs_embeds=None,
**model_kwargs,
)
else:
if is_310p():
converting_weight_acl_format(self.model, ACL_FORMAT_FRACTAL_ND)
hidden_states = super()._generate_dummy_run_hidden_states(
with_prefill, is_torchair_compile, input_ids, positions,
attn_metadata, num_tokens, intermediate_tensors, inputs_embeds)
return hidden_states
def _convert_torch_format(self, kv_cache):
if self.enable_shared_expert_dp:
return super()._convert_torch_format(kv_cache)
kv_cache = torch_npu.npu_format_cast(kv_cache, ACL_FORMAT_FRACTAL_ND)
return kv_cache
@@ -194,6 +201,8 @@ class NPUTorchairModelRunner(NPUModelRunner):
def _capture_model(self):
"""Override from NPUModelRunner to use torchair graph capture."""
if self.enable_shared_expert_dp:
return super()._capture_model()
# TODO(NeverRaR): Calling graph_capture(device=self.device) in
# torchair graph capture can cause some issues, so now we just
# temporarily split the codepath for the two different graph patterns.
@@ -233,6 +242,8 @@ class NPUTorchairModelRunner(NPUModelRunner):
self.new_kv_cache_bytes)
def _use_aclgraph(self) -> bool:
if self.enable_shared_expert_dp:
return super()._use_aclgraph()
return False
def _check_batch_sizes_consistency(self) -> None:
@@ -258,10 +269,10 @@ class NPUTorchairModelRunner(NPUModelRunner):
)
def _update_graph_pad_size(self, with_prefill, graph_pad_size):
if not with_prefill:
self.graph_pad_size = graph_pad_size
else:
if with_prefill or self.enable_shared_expert_dp:
super()._update_graph_pad_size(with_prefill, graph_pad_size)
else:
self.graph_pad_size = graph_pad_size
def _update_input_ids_and_positions(self, input_ids, positions,
num_input_tokens, with_prefill,
@@ -271,7 +282,9 @@ class NPUTorchairModelRunner(NPUModelRunner):
input_ids, positions, num_input_tokens, with_prefill,
padded_num_tokens_across_dp)
if not with_prefill:
if with_prefill or self.enable_shared_expert_dp:
return input_ids, positions
else:
input_ids = self.input_ids[:padded_num_tokens_across_dp]
positions = self.positions[:padded_num_tokens_across_dp]
return input_ids, positions
@@ -284,6 +297,10 @@ class NPUTorchairModelRunner(NPUModelRunner):
if attn_metadata is not None and isinstance(attn_metadata, dict):
attn_metadata = attn_metadata['model.layers.0.self_attn.attn']
if self.enable_shared_expert_dp:
return super()._generate_process_reqs_hidden_states(
attn_metadata, with_prefill, padded_num_tokens_across_dp,
input_ids, positions, intermediate_tensors, inputs_embeds)
model_kwargs = {
"kv_caches": self.kv_caches,
"attn_metadata": attn_metadata
@@ -468,8 +485,7 @@ class NPUTorchairModelRunner(NPUModelRunner):
self.torchair_graph_batch_sizes = new_graph_batch_sizes
def _build_drafter_prepare_inputs_torchair_param(self):
return True
def get_dp_padding(self, num_tokens):
"""Override from NPUModelRunner to get dp padding"""
return 0, None
if self.enable_shared_expert_dp:
return super()._build_drafter_prepare_inputs_torchair_param()
else:
return True

View File

@@ -32,9 +32,10 @@ class NPUTorchairWorker(NPUWorker):
"""Override determine_available_memory to use cached torchair kv_cache_bytes."""
available_kv_cache_memory = super().determine_available_memory()
if get_ascend_config(
).torchair_graph_config.use_cached_kv_cache_bytes and check_kv_cache_bytes_cache_exist(
ascend_config = get_ascend_config()
if ascend_config.enable_shared_expert_dp:
return available_kv_cache_memory
if ascend_config.torchair_graph_config.use_cached_kv_cache_bytes and check_kv_cache_bytes_cache_exist(
):
old_kv_cache_bytes = read_kv_cache_bytes_from_file(
torch.distributed.get_rank())