[Dist][EP] Remove ETP/EP maintained in vllm-ascend (#1681)

### What this PR does / why we need it?
Remove ETP/EP maintained in branch main. We drop this as there is no
relevant scenarios to use ETP now, and we may subsequently advocate
implementing expert tensor parallelism in vLLM to support scenarios
where the expert is needed to be sliced

This is a part of #1422 backport.

Fixes https://github.com/vllm-project/vllm-ascend/issues/1396
https://github.com/vllm-project/vllm-ascend/issues/1154

### Does this PR introduce _any_ user-facing change?
We'll not maintain etp/ep in vllm-ascend anymore, and use the tp/ep in
vllm instead.

### How was this patch tested?
CI passed with new added and existing test.


- vLLM version: v0.9.2
- vLLM main:
fe8a2c544a

Signed-off-by: MengqingCao <cmq0113@163.com>
This commit is contained in:
Mengqing Cao
2025-07-21 09:08:04 +08:00
committed by GitHub
parent a8b316ac5b
commit 8cfd257992
24 changed files with 66 additions and 548 deletions

View File

@@ -48,6 +48,7 @@ Run the following script to start the vLLM server on Multi-NPU:
```bash
vllm serve /path/to/pangu-pro-moe-model \
--tensor-parallel-size 4 \
--enable-expert-parallel \
--trust-remote-code \
--enforce-eager
```
@@ -145,6 +146,7 @@ if __name__ == "__main__":
llm = LLM(model="/path/to/pangu-pro-moe-model",
tensor_parallel_size=4,
enable_expert_parallel=True,
distributed_executor_backend="mp",
max_model_len=1024,
trust_remote_code=True,

View File

@@ -28,7 +28,6 @@ The following table lists the additional configuration options available in vLLM
|-------------------------------| ---- |------|-----------------------------------------------------------------------------------------------|
| `torchair_graph_config` | dict | `{}` | The config options for torchair graph mode |
| `ascend_scheduler_config` | dict | `{}` | The config options for ascend scheduler |
| `expert_tensor_parallel_size` | str | `0` | Expert tensor parallel size the model to use. |
| `refresh` | bool | `false` | Whether to refresh global ascend config content. This value is usually used by rlhf or ut/e2e test case. |
| `expert_map_path` | str | `None` | When using expert load balancing for the MOE model, an expert map path needs to be passed in. |
| `chunked_prefill_for_mla` | bool | `False` | Whether to enable the fused operator-like chunked_prefill. |
@@ -75,7 +74,6 @@ An example of additional configuration is as follows:
"enabled": True,
"enable_chunked_prefill": True,
},
"expert_tensor_parallel_size": 1,
"refresh": False,
}
```

View File

@@ -1,22 +0,0 @@
export TASK_QUEUE_ENABLE=1
source /usr/local/Ascend/ascend-toolkit/set_env.sh
source /usr/local/Ascend/nnal/atb/set_env.sh
export ASCEND_LAUNCH_BLOCKING=0
export VLLM_VERSION=0.9.1
nohup python -m vllm.entrypoints.openai.api_server --model=/mnt/deepseek/DeepSeek-R1-W8A8-VLLM \
--served-model-name auto \
--quantization ascend \
--trust-remote-code \
--distributed-executor-backend=mp \
--port 8006 \
-tp=8 \
-dp=2 \
--max-num-seqs 24 \
--max-model-len 32768 \
--max-num-batched-tokens 32768 \
--block-size 128 \
--no-enable-prefix-caching \
--additional-config '{"torchair_graph_config":{"enabled":true,"use_cached_graph":true,"graph_batch_sizes":[24]},"ascend_scheduler_config":{"enabled":true},"expert_tensor_parallel_size":16}' \
--gpu-memory-utilization 0.96 &> run.log &
disown

View File

@@ -1,57 +0,0 @@
#!/bin/bash
# Concurrency array
concurrency_array=(48)
#best rate
rate_array=(0.7)
# Result file
result_file="benchmark_results.txt"
echo "Benchmark Results" > $result_file
echo "===================" >> $result_file
# Loop through all combinations
for concurrency in "${concurrency_array[@]}"; do
for rate in "${rate_array[@]}"; do
echo "Testing with concurrency=$concurrency, rate=$rate"
echo "" >> $result_file
echo "Concurrency: $concurrency, Request Rate: $rate" >> $result_file
echo "-------------------" >> $result_file
# Run benchmark test
python /mnt/deepseek/vllm/benchmarks/benchmark_serving.py \
--backend vllm \
--trust-remote-code \
--model auto \
--tokenizer /mnt/deepseek/DeepSeek-R1-W8A8-VLLM \
--dataset-name random \
--random-input-len 4096 \
--random-output-len 1536 \
--ignore-eos \
--num-prompts 400 \
--max-concurrency $concurrency \
--request-rate $rate \
--metric-percentiles 90 \
--base-url http://localhost:8006 2>&1 | tee -a $result_file
# Wait for system cool down
sleep 30
done
done
# Analyze results
echo "Analysis Results" > analysis_results.txt
echo "=================" >> analysis_results.txt
# Extract and analyze TPOT data
echo "TPOT Analysis:" >> analysis_results.txt
grep "Mean TPOT" $result_file | awk -F':' '{
printf "Concurrency %s, Rate %s: %s ms\n", $1, $2, $3
}' >> analysis_results.txt
# Extract and analyze throughput data
echo -e "\nThroughput Analysis:" >> analysis_results.txt
grep "Output token throughput" $result_file | awk -F':' '{
printf "Concurrency %s, Rate %s: %s tokens/s\n", $1, $2, $3
}' >> analysis_results.txt
echo "Testing completed. Results saved in $result_file and analysis in analysis_results.txt"

View File

@@ -36,7 +36,7 @@ COMPLETIONS_URL = f"http://{SERVER_HOST}:{SERVER_PORT}/v1/completions"
# pre-trained model path on Hugging Face.
# Qwen/Qwen2.5-0.5B-Instruct: accuracy test for DP.
# Qwen/Qwen3-30B-A3B: accuracy test for EP and ETP.
# Qwen/Qwen3-30B-A3B: accuracy test for EP.
# deepseek-ai/DeepSeek-V2-Lite: accuracy test for TP.
MODEL_NAME = ["Qwen/Qwen3-30B-A3B", "deepseek-ai/DeepSeek-V2-Lite"]
@@ -200,62 +200,3 @@ def test_lm_eval_accuracy_dp(model, max_tokens):
except subprocess.TimeoutExpired:
server_proc.kill()
server_proc.wait()
@pytest.mark.parametrize("max_tokens", [10])
@pytest.mark.parametrize("model", ["Qwen/Qwen3-30B-A3B"])
def test_lm_eval_accuracy_etp(model, max_tokens):
log_file = open("accuracy_etp.log", "a+")
cmd = [
"vllm", "serve", model, "--max_model_len", "4096",
"--tensor_parallel_size", "4", "--enforce_eager",
"--enable_expert_parallel", "--additional_config",
'{"expert_tensor_parallel_size": "4"}'
]
server_proc = subprocess.Popen(cmd,
stdout=log_file,
stderr=subprocess.DEVNULL)
try:
for _ in range(300):
try:
r = requests.get(HEALTH_URL, timeout=1)
if r.status_code == 200:
break
except requests.exceptions.RequestException:
pass
time.sleep(1)
else:
log_file.flush()
log_file.seek(0)
log_content = log_file.read()
pytest.fail(
f"vLLM serve did not become healthy after 300s: {HEALTH_URL}\n"
f"==== vLLM Serve Log Start ===\n{log_content}\n==== vLLM Serve Log End ==="
)
prompt = "bejing is a"
payload = {
"prompt": prompt,
"max_tokens": max_tokens,
"sampling_params": {
"temperature": 0.0,
"top_p": 1.0,
"seed": 123
}
}
resp = requests.post(COMPLETIONS_URL, json=payload, timeout=30)
resp.raise_for_status()
data = resp.json()
generated = data["choices"][0]["text"].strip()
expected = "city in china. it is the capital city of"
assert generated == expected, f"Expected `{expected}`, got `{generated}`"
finally:
server_proc.send_signal(signal.SIGINT)
try:
server_proc.wait(timeout=10)
except subprocess.TimeoutExpired:
server_proc.kill()
server_proc.wait()

View File

@@ -0,0 +1,30 @@
import pytest
from tests.e2e.conftest import VllmRunner
from tests.e2e.model_utils import check_outputs_equal
@pytest.mark.parametrize("model_name", ["deepseek-ai/DeepSeek-V2-Lite-Chat"])
def test_e2e_ep_correctness(model_name):
example_prompts = [
"Hello, my name is",
"The president of the United States is",
"The capital of France is",
"The future of AI is",
]
max_tokens = 5
with VllmRunner(model_name, tensor_parallel_size=2) as vllm_model:
tp_output = vllm_model.generate_greedy(example_prompts, max_tokens)
with VllmRunner(model_name,
tensor_parallel_size=2,
enable_expert_parallel=True) as vllm_model:
ep_output = vllm_model.generate_greedy(example_prompts, max_tokens)
check_outputs_equal(
outputs_0_lst=ep_output,
outputs_1_lst=tp_output,
name_0="ep_output",
name_1="tp_output",
)

View File

@@ -50,7 +50,6 @@ def test_generate_with_allgather():
"enabled": True,
"chunked_prefill_enabled": False,
},
"expert_tensor_parallel_size": 1
}) as vllm_model:
vllm_model.generate(example_prompts, sampling_params)
@@ -74,6 +73,5 @@ def test_generate_with_alltoall():
"enabled": True,
"chunked_prefill_enabled": False,
},
"expert_tensor_parallel_size": 1
}) as vllm_model:
vllm_model.generate(example_prompts, sampling_params)

View File

@@ -123,6 +123,7 @@ def _pangu_torchair_test_fixture(
distributed_executor_backend="mp",
enforce_eager=False,
additional_config=additional_config,
enable_expert_parallel=True,
) as vllm_model:
# use greedy sampler to make sure the generated results are fix
vllm_output = vllm_model.generate_greedy(example_prompts, 5)

View File

@@ -1,208 +0,0 @@
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# This file is a part of the vllm-ascend project.
#
from unittest.mock import MagicMock, patch
import pytest
from vllm.distributed.parallel_state import GroupCoordinator
import vllm_ascend
from tests.ut.base import TestBase
from vllm_ascend.distributed.parallel_state import (
destory_ascend_model_parallel, get_ep_group, get_etp_group,
init_ascend_model_parallel, model_parallel_initialized)
class TestParallelState(TestBase):
@patch('vllm_ascend.distributed.parallel_state._EP',
new_callable=lambda: MagicMock(spec=GroupCoordinator))
def test_get_ep_group_when_initialized(self, mock_ep):
# Act
result = get_ep_group()
# Assert
assert isinstance(result, GroupCoordinator)
@patch('vllm_ascend.distributed.parallel_state._EP', None)
def test_get_ep_group_when_not_initialized(self):
# Act & Assert
with pytest.raises(AssertionError) as excinfo:
get_ep_group()
assert "expert model parallel group is not initialized" in str(
excinfo.value)
@patch('vllm_ascend.distributed.parallel_state._ETP',
new_callable=lambda: MagicMock(spec=GroupCoordinator))
def test_get_etp_group_when_initialized(self, mock_etp):
# Act
result = get_etp_group()
# Assert
assert isinstance(result, GroupCoordinator)
@patch('vllm_ascend.distributed.parallel_state._ETP', None)
def test_get_etp_group_when_not_initialized(self):
# Act & Assert
with pytest.raises(AssertionError) as excinfo:
get_etp_group()
assert "expert tensor parallel group is not initialized" in str(
excinfo.value)
@patch('vllm_ascend.distributed.parallel_state._ETP', None)
@patch('vllm_ascend.distributed.parallel_state._EP', None)
def test_model_parallel_initialized_when_both_none(self):
# Act & Assert
assert not model_parallel_initialized()
@patch('vllm_ascend.distributed.parallel_state._ETP',
new_callable=lambda: MagicMock(spec=GroupCoordinator))
@patch('vllm_ascend.distributed.parallel_state._EP', None)
def test_model_parallel_initialized_when_ep_none(self, mock_etp):
# Act & Assert
assert not model_parallel_initialized()
@patch('vllm_ascend.distributed.parallel_state._ETP', None)
@patch('vllm_ascend.distributed.parallel_state._EP',
new_callable=lambda: MagicMock(spec=GroupCoordinator))
def test_model_parallel_initialized_when_etp_none(self, mock_ep):
# Act & Assert
assert not model_parallel_initialized()
@patch('vllm_ascend.distributed.parallel_state._ETP',
new_callable=lambda: MagicMock(spec=GroupCoordinator))
@patch('vllm_ascend.distributed.parallel_state._EP',
new_callable=lambda: MagicMock(spec=GroupCoordinator))
def test_model_parallel_initialized_when_etp_initialized(
self, mock_ep, mock_etp):
# Act & Assert
assert model_parallel_initialized()
@patch('vllm_ascend.distributed.parallel_state._ETP',
new_callable=lambda: MagicMock(spec=GroupCoordinator))
@patch('vllm_ascend.distributed.parallel_state._EP',
new_callable=lambda: MagicMock(spec=GroupCoordinator))
def test_destroy_when_both_exist(self, mock_ep, mock_etp):
# Act
destory_ascend_model_parallel()
# Assert
mock_ep.destroy.assert_called_once()
mock_etp.destroy.assert_called_once()
assert vllm_ascend.distributed.parallel_state._ETP is None
assert vllm_ascend.distributed.parallel_state._EP is None
@patch('vllm_ascend.distributed.parallel_state._ETP', None)
@patch('vllm_ascend.distributed.parallel_state._EP',
new_callable=lambda: MagicMock())
def test_destory_ascend_model_parallel_when_etp_none(self, mock_ep):
# Act
destory_ascend_model_parallel()
# Assert
mock_ep.destroy.assert_called_once()
assert vllm_ascend.distributed.parallel_state._EP is None
assert vllm_ascend.distributed.parallel_state._ETP is None
@patch('vllm_ascend.distributed.parallel_state._ETP',
new_callable=lambda: MagicMock())
@patch('vllm_ascend.distributed.parallel_state._EP', None)
def test_destory_ascend_model_parallel_when_ep_none(self, mock_etp):
# Act
destory_ascend_model_parallel()
# Assert
mock_etp.destroy.assert_called_once()
assert vllm_ascend.distributed.parallel_state._ETP is None
assert vllm_ascend.distributed.parallel_state._EP is None
@patch('vllm_ascend.distributed.parallel_state._ETP', None)
@patch('vllm_ascend.distributed.parallel_state._EP', None)
def test_destory_ascend_model_parallel_when_both_none(self):
# Act
destory_ascend_model_parallel()
# Assert
assert vllm_ascend.distributed.parallel_state._ETP is None
assert vllm_ascend.distributed.parallel_state._EP is None
@patch('torch.distributed.is_initialized', return_value=True)
@patch('torch.distributed.get_world_size', return_value=8)
@patch('vllm_ascend.distributed.parallel_state.get_world_group',
return_value=MagicMock(device_group='npu:0', local_rank=0))
@patch('torch.distributed.get_backend', return_value='hccl')
@patch('vllm_ascend.distributed.parallel_state.init_model_parallel_group')
@patch('vllm_ascend.distributed.parallel_state.model_parallel_initialized',
return_value=False)
def test_init_ascend_model_parallel_normal_case(
self, mock_mp_init, mock_init_group, mock_get_backend,
mock_world_group, mock_get_world_size, mock_is_init):
"""Test normal initialization with default parameters"""
# Act
init_ascend_model_parallel()
# Assert
mock_init_group.assert_any_call([[0, 1, 2, 3, 4, 5, 6, 7]],
0,
'hccl',
group_name="ep")
mock_init_group.assert_any_call([[0]], 0, 'hccl', group_name="etp")
self.assertIsNotNone(vllm_ascend.distributed.parallel_state._EP)
self.assertIsNotNone(vllm_ascend.distributed.parallel_state._ETP)
@patch('vllm_ascend.distributed.parallel_state.model_parallel_initialized',
return_value=True)
def test_init_ascend_model_parallel_skip_if_initialized(
self, mock_mp_init):
"""Test skipping when model parallel already initialized"""
with patch.object(vllm_ascend.distributed.parallel_state,
'_EP') as mock_ep, patch.object(
vllm_ascend.distributed.parallel_state,
'_ETP') as mock_etp:
# Act
init_ascend_model_parallel()
# Assert
mock_ep.assert_not_called()
mock_etp.assert_not_called()
@patch('torch.distributed.is_initialized', return_value=False)
def test_init_ascend_model_parallel_assert_dist_not_init(
self, mock_is_init):
"""Test assertion when distributed not initialized"""
# Act & Assert
with self.assertRaises(AssertionError):
init_ascend_model_parallel()
@patch('torch.distributed.is_initialized', return_value=True)
@patch('torch.distributed.get_world_size', return_value=8)
@patch('vllm_ascend.distributed.parallel_state.get_world_group',
return_value=MagicMock(device_group='npu:0', local_rank=1))
@patch('torch.distributed.get_backend', return_value='hccl')
@patch('vllm_ascend.distributed.parallel_state.init_model_parallel_group')
@patch('vllm_ascend.distributed.parallel_state.model_parallel_initialized',
return_value=False)
def test_init_ascend_model_parallel_custom_params(
self, mock_mp_init, mock_init_group, mock_get_backend,
mock_world_group, mock_get_world_size, mock_is_init):
"""Test initialization with custom parallel sizes"""
# Act
init_ascend_model_parallel(expert_parallel_size=2,
expert_tensor_parallel_size=4,
world_size=8,
backend='hccl')
#Assert
mock_init_group.assert_any_call([[0, 4], [1, 5], [2, 6], [3, 7]],
1,
'hccl',
group_name="ep")
mock_init_group.assert_any_call([[0, 1, 2, 3], [4, 5, 6, 7]],
1,
'hccl',
group_name="etp")

View File

@@ -42,7 +42,6 @@ class TestAscendConfig(TestBase):
test_vllm_config = VllmConfig()
# No additional config given, check the default value here.
ascend_config = init_ascend_config(test_vllm_config)
self.assertEqual(ascend_config.expert_tensor_parallel_size, 0)
self.assertIsNone(ascend_config.expert_map_path)
torchair_graph_config = ascend_config.torchair_graph_config
@@ -75,12 +74,10 @@ class TestAscendConfig(TestBase):
"ascend_scheduler_config": {
"enabled": True
},
"expert_tensor_parallel_size": 1,
"expert_map_path": "test_expert_map_path",
"refresh": True
}
ascend_config = init_ascend_config(test_vllm_config)
self.assertEqual(ascend_config.expert_tensor_parallel_size, 1)
self.assertEqual(ascend_config.expert_map_path, "test_expert_map_path")
torchair_graph_config = ascend_config.torchair_graph_config

View File

@@ -28,7 +28,6 @@ class TestNPUPlatform(TestBase):
self.mock_vllm_config.speculative_config = None
self.mock_ascend_config = MagicMock()
self.mock_ascend_config.expert_tensor_parallel_size = 0
self.mock_ascend_config.torchair_graph_config.enabled = False
self.mock_ascend_config.ascend_scheduler_config.enabled = False
@@ -253,30 +252,6 @@ class TestNPUPlatform(TestBase):
mock_init_ascend.assert_called_once_with(self.mock_vllm_config)
mock_check_ascend.assert_called_once()
@patch("vllm_ascend.utils.is_310p", return_value=False)
@patch("vllm_ascend.ascend_config.check_ascend_config")
@patch("vllm_ascend.ascend_config.init_ascend_config")
def test_check_and_update_config_expert_parallel_enabled(
self, mock_init_ascend, mock_check_ascend, mock_is_310p):
mock_init_ascend.return_value = self.mock_ascend_config
self.mock_vllm_config.parallel_config.enable_expert_parallel = True
self.mock_vllm_config.parallel_config.tensor_parallel_size = 2
self.mock_vllm_config.parallel_config.world_size_across_dp = 4
from vllm_ascend import platform
importlib.reload(platform)
self.platform.check_and_update_config(self.mock_vllm_config)
self.assertEqual(
self.mock_vllm_config.parallel_config.expert_tensor_parallel_size,
1)
self.assertEqual(
self.mock_vllm_config.parallel_config.expert_parallel_size,
self.mock_vllm_config.parallel_config.world_size_across_dp,
)
@patch("vllm_ascend.utils.is_310p", return_value=False)
@patch("vllm_ascend.ascend_config.check_ascend_config")
@patch("vllm_ascend.ascend_config.init_ascend_config")

View File

@@ -44,8 +44,6 @@ class AscendConfig:
self.ascend_scheduler_config = AscendSchedulerConfig(
ascend_scheduler_config)
self.expert_tensor_parallel_size = int(
additional_config.get("expert_tensor_parallel_size", 0))
self.expert_map_path = additional_config.get("expert_map_path", None)
self.chunked_prefill_for_mla = additional_config.get(
"chunked_prefill_for_mla", False)

View File

@@ -1,77 +0,0 @@
from typing import Optional
import torch
from vllm.distributed.parallel_state import (GroupCoordinator, get_world_group,
init_model_parallel_group)
# vllm-ascend will maintain its own EP GroupCoordinator and ETP GroupCoordinator for
# customize parallel solution
_EP: Optional[GroupCoordinator] = None
_ETP: Optional[GroupCoordinator] = None
def get_ep_group() -> GroupCoordinator:
assert _EP is not None, ("expert model parallel group is not initialized")
return _EP
def get_etp_group() -> GroupCoordinator:
assert _ETP is not None, (
"expert tensor parallel group is not initialized")
return _ETP
def model_parallel_initialized():
return (_ETP is not None and _EP is not None)
def init_ascend_model_parallel(
expert_parallel_size: int = 1,
expert_tensor_parallel_size: int = 1,
world_size: Optional[int] = None,
backend: Optional[str] = None,
):
if model_parallel_initialized():
return
assert torch.distributed.is_initialized()
world_size = world_size or torch.distributed.get_world_size()
backend = backend or torch.distributed.get_backend(
get_world_group().device_group)
num_expert_parallel_groups = expert_tensor_parallel_size
num_expert_tensor_parallel_groups = expert_parallel_size
global _EP
group_ranks = []
for i in range(num_expert_parallel_groups):
ranks = list(range(i, world_size, num_expert_parallel_groups))
group_ranks.append(ranks)
_EP = init_model_parallel_group(group_ranks,
get_world_group().local_rank,
backend,
group_name="ep")
group_ranks = []
global _ETP
for i in range(num_expert_tensor_parallel_groups):
ranks = list(
range(i * expert_tensor_parallel_size,
(i + 1) * expert_tensor_parallel_size))
group_ranks.append(ranks)
_ETP = init_model_parallel_group(group_ranks,
get_world_group().local_rank,
backend,
group_name="etp")
def destory_ascend_model_parallel():
global _EP
if _EP:
_EP.destroy()
_EP = None
global _ETP
if _ETP:
_ETP.destroy()
_ETP = None

View File

@@ -39,7 +39,7 @@ from vllm.distributed import (get_pp_group, get_tensor_model_parallel_rank,
tensor_model_parallel_all_gather,
tensor_model_parallel_all_reduce,
tensor_model_parallel_reduce_scatter)
from vllm.distributed.parallel_state import get_dp_group
from vllm.distributed.parallel_state import get_dp_group, get_ep_group
from vllm.forward_context import get_forward_context
from vllm.model_executor.layers.activation import SiluAndMul
from vllm.model_executor.layers.layernorm import RMSNorm
@@ -69,7 +69,6 @@ from vllm.model_executor.models.utils import (
from vllm.sequence import IntermediateTensors
from vllm_ascend.ascend_config import get_ascend_config
from vllm_ascend.distributed.parallel_state import get_ep_group
from vllm_ascend.ops.fused_moe import AscendFusedMoE
from vllm_ascend.quantization.quant_config import AscendLinearMethod
from vllm_ascend.quantization.w8a8_dynamic import AscendW8A8DynamicLinearMethod

View File

@@ -30,8 +30,8 @@ from vllm.config import CacheConfig, VllmConfig
from vllm.distributed import (divide, get_pp_group,
get_tensor_model_parallel_world_size,
tensor_model_parallel_all_reduce)
from vllm.distributed.parallel_state import (get_dp_group, get_tp_group,
get_world_group)
from vllm.distributed.parallel_state import (get_dp_group, get_ep_group,
get_tp_group, get_world_group)
from vllm.forward_context import get_forward_context
from vllm.logger import init_logger
from vllm.model_executor.layers.activation import SiluAndMul
@@ -58,7 +58,6 @@ from vllm.model_executor.utils import set_weight_attrs
from vllm.sequence import IntermediateTensors
from vllm_ascend.ascend_config import get_ascend_config
from vllm_ascend.distributed.parallel_state import get_ep_group
from vllm_ascend.utils import ACL_FORMAT_FRACTAL_NZ, is_310p
logger = init_logger(__name__)
@@ -93,7 +92,7 @@ class CustomMergedColumnParallelLinear(LinearBase):
# Divide the weight matrix along the last dimension.
output_size = sum(output_sizes)
self.output_sizes = output_sizes
self.tp_size = get_world_group().world_size
self.tp_size = get_tp_group().world_size
self.input_size_per_partition = input_size
self.output_size_per_partition = divide(output_size, self.tp_size)
self.output_partition_sizes = [self.output_size_per_partition]
@@ -144,8 +143,8 @@ class CustomMergedColumnParallelLinear(LinearBase):
assert loaded_shard_id < len(self.output_sizes)
tp_rank = get_world_group().rank_in_group
tp_size = get_world_group().world_size
tp_rank = get_tp_group().rank_in_group
tp_size = get_tp_group().world_size
if output_dim is not None:
shard_offset = sum(self.output_sizes[:loaded_shard_id]) // tp_size
shard_size = self.output_sizes[loaded_shard_id] // tp_size
@@ -204,7 +203,7 @@ class CustomRowParallelLinear(LinearBase):
group=None,
):
# Divide the weight matrix along the first dimension.
self.group = group if group is not None else get_world_group()
self.group = group if group is not None else get_tp_group()
self.tp_rank = self.group.rank_in_group
self.tp_size = self.group.world_size
self.input_size_per_partition = divide(input_size, self.tp_size)
@@ -357,7 +356,7 @@ def topk_wrapper(num_voted_experts):
num_tokens = scores.shape[0]
router_scale = _ROUTER_SCALE.squeeze( # type: ignore
)
# TODO: support disable expert parallel
ep_size = get_ep_group().world_size
local_num_experts = global_num_experts // ep_size
local_num_group = topk // ep_size
@@ -464,6 +463,7 @@ class PanguProMoESparseMoeBlock(nn.Module):
custom_routing_function=topk_wrapper(num_voted_experts),
prefix=f"{prefix}.experts",
)
self.use_ep = self.experts.use_ep
self.gate = ReplicatedLinear(
config.hidden_size,

View File

@@ -88,6 +88,7 @@ def forward_oot(
hidden_states=x,
w1=layer.w13_weight,
w2=layer.w2_weight,
moe_parallel_config=self.moe.moe_parallel_config,
topk_weights=topk_weights,
topk_ids=topk_ids,
top_k=top_k,

View File

@@ -26,7 +26,8 @@ from vllm.config import get_current_vllm_config
from vllm.distributed import (GroupCoordinator, get_tensor_model_parallel_rank,
get_tensor_model_parallel_world_size,
tensor_model_parallel_all_reduce)
from vllm.distributed.parallel_state import get_dp_group, get_tp_group
from vllm.distributed.parallel_state import (get_dp_group, get_ep_group,
get_tp_group)
from vllm.forward_context import get_forward_context
from vllm.model_executor.layers.fused_moe.config import \
FusedMoEConfig # isort: skip
@@ -41,7 +42,6 @@ import vllm_ascend.envs as envs_ascend
from vllm_ascend.ascend_config import get_ascend_config
from vllm_ascend.distributed.communication_op import \
data_parallel_reduce_scatter
from vllm_ascend.distributed.parallel_state import get_ep_group, get_etp_group
from vllm_ascend.ops.expert_load_balancer import ExpertLoadBalancer
from vllm_ascend.utils import (FusedMoEState, dispose_tensor,
get_all_reduce_merge_state, get_fused_moe_state,
@@ -124,6 +124,7 @@ def fused_experts_with_mc2(
topk_weights: torch.Tensor,
topk_ids: torch.Tensor,
top_k: int,
moe_parallel_config: FusedMoEParallelConfig,
expert_map: torch.Tensor = None,
moe_all_to_all_group_name: Optional[str] = None,
shared_experts: Optional[Any] = None
@@ -142,22 +143,20 @@ def fused_experts_with_mc2(
rank = torch.distributed.get_rank()
quant_mode = 0
ep_group = get_ep_group().device_group
local_rank = torch.distributed.get_rank(group=ep_group)
all_to_all_group_size = torch.distributed.get_world_size(ep_group)
ep_rank_id = moe_parallel_config.ep_rank
ep_world_size = moe_parallel_config.ep_size
tp_size = get_etp_group().world_size
tp_rank = rank % tp_size
tp_world_size = moe_parallel_config.tp_size
tp_rank = rank % tp_world_size
stage1_kwargs = {
"scales": None,
"quant_mode": quant_mode,
"group_ep": moe_all_to_all_group_name,
"ep_world_size": all_to_all_group_size,
"ep_rank_id": local_rank,
# "group_tp": self.moe_rs_group_name,
"ep_world_size": ep_world_size,
"ep_rank_id": ep_rank_id,
"group_tp": moe_all_to_all_group_name,
"tp_world_size": tp_size,
"tp_world_size": tp_world_size,
"tp_rank_id": tp_rank,
}
kwargs_mc2.update(stage1_kwargs)
@@ -217,12 +216,12 @@ def fused_experts_with_mc2(
stage3_kwargs = {
"ep_send_counts": ep_recv_counts,
"group_ep": moe_all_to_all_group_name,
"ep_world_size": all_to_all_group_size,
"ep_rank_id": local_rank,
"ep_world_size": ep_world_size,
"ep_rank_id": ep_rank_id,
"tp_send_counts": tp_recv_counts,
# "group_tp": self.moe_rs_group_name,
"group_tp": moe_all_to_all_group_name,
"tp_world_size": tp_size,
"tp_world_size": tp_world_size,
"tp_rank_id": tp_rank,
}
kwargs_mc2.update(stage3_kwargs)
@@ -560,6 +559,7 @@ def fused_experts_moge(
hidden_states: torch.Tensor,
w1: torch.Tensor,
w2: torch.Tensor,
moe_parallel_config: FusedMoEParallelConfig,
topk_weights: torch.Tensor,
topk_ids: torch.Tensor,
top_k: int,
@@ -581,7 +581,7 @@ def fused_experts_moge(
Returns:
hidden_states: Hidden states after routing.
"""
ep_size = get_ep_group().world_size
ep_size = moe_parallel_config.ep_size
local_num_experts = global_num_experts // ep_size
local_num_group = top_k // ep_size
@@ -982,7 +982,7 @@ class AscendUnquantizedFusedMoEMethod(UnquantizedFusedMoEMethod):
vllm_config = get_current_vllm_config()
self.ep_group = get_ep_group()
self.ep_size = self.ep_group.world_size
self.ep_size = self.moe.moe_parallel_config.ep_size
self.global_batch_size = vllm_config.scheduler_config.max_num_seqs
self.local_batch_size = self.global_batch_size // self.ep_size
self.max_model_len = vllm_config.model_config.max_model_len
@@ -1074,13 +1074,14 @@ class AscendUnquantizedFusedMoEMethod(UnquantizedFusedMoEMethod):
if enable_force_load_balance:
topk_ids = torch.randint_like(topk_ids, 0, global_num_experts)
fused_moe_state = get_fused_moe_state(self.ep_group.world_size,
is_prefill, is_deepseek_v3_r1)
fused_moe_state = get_fused_moe_state(self.ep_size, is_prefill,
is_deepseek_v3_r1)
if fused_moe_state == FusedMoEState.MC2:
return fused_experts_with_mc2(
hidden_states=x,
w1=layer.w13_weight,
w2=layer.w2_weight,
moe_parallel_config=self.moe.moe_parallel_config,
topk_weights=topk_weights,
topk_ids=topk_ids,
top_k=top_k,

View File

@@ -37,17 +37,7 @@
# =================
# ** File: platform/patch_common/patch_distributed.py**
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# 1. `vllm.distributed.parallel_state.destroy_model_parallel()`
# Why:
# vllm dose not support outside platform maintain its own `CoordinatorGroup`, vllm-ascend maintain EP and ETP
# inside of the repo, and needs a common interface to destroy them, this patch add the interface of destroy
# platform owned `CoordinatorGroup` to make sure all the CoordinateGroup can be properly destroyed
# How
# Call `vllm_ascend.distributed.parallel_state method `destroy_platform_model_parallel` to destroy all the `CoordinateGroup`
# Related PR (if no, explain why):
# Future Plan:
# Remove those patch when vllm merged them
# 2. `vllm.config.ParallelConfig.get_next_dp_init_port`
# 1. `vllm.config.ParallelConfig.get_next_dp_init_port`
# Why:
# vllm doesn't support get port from environment.
# How

View File

@@ -18,33 +18,12 @@
# This file is a part of the vllm-ascend project.
import torch
import vllm
import vllm.distributed
import vllm.envs as envs
from vllm.config import ParallelConfig
from vllm_ascend.utils import is_310p
def ascend_destroy_model_parallel():
"""Set the groups to none and destroy them."""
from vllm.distributed.parallel_state import _DP, _PP, _TP
if _TP:
_TP.destroy()
_TP = None
if _PP:
_PP.destroy()
_PP = None
if _DP:
_DP.destroy()
_DP = None
from vllm_ascend.distributed.parallel_state import \
destory_ascend_model_parallel
destory_ascend_model_parallel()
def parallel_config_get_dp_port(self) -> int:
"""
We might need to initialize process groups in multiple
@@ -62,7 +41,6 @@ def parallel_config_get_dp_port(self) -> int:
return port
vllm.distributed.parallel_state.destroy_model_parallel = ascend_destroy_model_parallel
ParallelConfig.get_next_dp_init_port = parallel_config_get_dp_port

View File

@@ -131,24 +131,6 @@ class NPUPlatform(Platform):
if kv_cache_dtype is not None:
vllm_config.cache_config.cache_dtype = kv_cache_dtype
if parallel_config:
# Default value for expert tensor parallel size
parallel_config.expert_tensor_parallel_size = parallel_config.tensor_parallel_size
# NOTE: When enable_expert_parallel is True, we follow vLLM convention:
# ep_size = world_size, which means expert_tensor_parallel_size must be 1
if parallel_config.enable_expert_parallel:
parallel_config.expert_tensor_parallel_size = 1
# NOTE: When enable_expert_parallel is False and param `asceend_config.expert_tensor_parallel_size`
# is configured, use ascend_config
elif ascend_config.expert_tensor_parallel_size > 0:
parallel_config.expert_tensor_parallel_size = ascend_config.expert_tensor_parallel_size
# Calculate expert parallel size based on world size
parallel_config.expert_parallel_size = (
parallel_config.world_size_across_dp //
parallel_config.expert_tensor_parallel_size)
if model_config is None:
logger.warning("Model config is missing. This may indicate "
"that we are running a test case")

View File

@@ -20,9 +20,9 @@ from typing import Any, Callable, Dict, Optional
import torch
import torch_npu
from vllm.attention.backends.abstract import AttentionType
from vllm.distributed.parallel_state import get_ep_group
from vllm_ascend.attention.attention_v1 import AscendAttentionState
from vllm_ascend.distributed.parallel_state import get_ep_group
from vllm_ascend.utils import ACL_FORMAT_FRACTAL_NZ, is_310p

View File

@@ -21,10 +21,10 @@ import torch
import torch.distributed as dist
import torch_npu
from vllm.distributed import GroupCoordinator
from vllm.distributed.parallel_state import get_ep_group
import vllm_ascend.envs as envs
from vllm_ascend.ascend_config import get_ascend_config
from vllm_ascend.distributed.parallel_state import get_ep_group
from vllm_ascend.ops.fused_moe import select_experts
from vllm_ascend.utils import (ACL_FORMAT_FRACTAL_NZ, FusedMoEState,
dispose_tensor, get_fused_moe_state,

View File

@@ -313,8 +313,6 @@ def update_aclgraph_sizes(vllm_config: VllmConfig) -> None:
parallel_factor = 1 + sum(size > 1 for size in [
parallel_config.data_parallel_size_local,
parallel_config.tensor_parallel_size,
parallel_config.expert_parallel_size,
parallel_config.expert_tensor_parallel_size,
])
# Calculate maximum supported batch sizes considering model architecture

View File

@@ -41,7 +41,6 @@ from vllm.v1.worker.worker_base import WorkerBase
import vllm_ascend.envs as envs_ascend
from vllm_ascend.ascend_config import get_ascend_config, init_ascend_config
from vllm_ascend.device_allocator.camem import CaMemAllocator
from vllm_ascend.distributed.parallel_state import init_ascend_model_parallel
from vllm_ascend.platform import NPUPlatform
from vllm_ascend.utils import (check_kv_cache_bytes_cache_exist,
check_torchair_cache_exist,
@@ -308,18 +307,12 @@ class NPUWorker(WorkerBase):
def _init_worker_distributed_environment(self) -> None:
"""Initialize the distributed environment."""
parallel_config = self.vllm_config.parallel_config
init_distributed_environment(self.parallel_config.world_size,
self.rank, self.distributed_init_method,
self.local_rank, "hccl")
ensure_model_parallel_initialized(
self.parallel_config.tensor_parallel_size,
self.parallel_config.pipeline_parallel_size)
init_ascend_model_parallel(
parallel_config.expert_parallel_size,
parallel_config.expert_tensor_parallel_size,
parallel_config.world_size_across_dp,
)
ensure_kv_transfer_initialized(self.vllm_config)
def _init_profiler(self):