[CI] Refator multi-node CI (#3487)

### What this PR does / why we need it?
Refactor the multi-machine CI use case. The purpose of this PR is to
increase the ease of adding multi-machine CI use cases, allowing
developers to add multi-machine cluster model testing use cases
(including PD separation) by simply adding a new YAML configuration
file.
### Does this PR introduce _any_ user-facing change?

### How was this patch tested?

- vLLM version: v0.11.0rc3
- vLLM main: https://github.com/vllm-project/vllm/commit/v0.11.0

---------

Signed-off-by: wangli <wangli858794774@gmail.com>
This commit is contained in:
Li Wang
2025-10-17 09:04:31 +08:00
committed by GitHub
parent ccb6fb9ec1
commit 4c4a8458a5
18 changed files with 632 additions and 437 deletions

View File

@@ -102,6 +102,15 @@ jobs:
wait $LOG_PID || true wait $LOG_PID || true
kill $MONITOR_PID || true kill $MONITOR_PID || true
- name: Generate summary
if: always()
run: |
if [ -f "/root/.cache/test_summary.md" ]; then
cat /root/.cache/test_summary.md >> "$GITHUB_STEP_SUMMARY"
else
echo "No summary file found." >> "$GITHUB_STEP_SUMMARY"
fi
- name: Post process - name: Post process
if: always() if: always()
run: | run: |

View File

@@ -66,16 +66,16 @@ Install the relevant dependencies. The installation of Go is not required.
```shell ```shell
cd Mooncake cd Mooncake
bash dependencies.sh bash dependencies.sh -y
``` ```
Install mpi Install mpi
```shell ```shell
apt purge mpich libmpich-dev apt purge mpich libmpich-dev -y
apt purge openmpi-bin apt purge openmpi-bin -y
apt purge openmpi-bin libopenmpi-dev apt purge openmpi-bin libopenmpi-dev -y
apt install mpich libmpich-dev apt install mpich libmpich-dev -y
export CPATH=/usr/lib/aarch64-linux-gnu/mpich/include/:$CPATH export CPATH=/usr/lib/aarch64-linux-gnu/mpich/include/:$CPATH
export CPATH=/usr/lib/aarch64-linux-gnu/openmpi/lib:$CPATH export CPATH=/usr/lib/aarch64-linux-gnu/openmpi/lib:$CPATH
``` ```

View File

@@ -205,7 +205,7 @@ vllm serve /models/deepseek_r1_w8a8 \
Run proxy server on the first node: Run proxy server on the first node:
```shell ```shell
cd /vllm-workspace/vllm-ascend/examples/disaggregated_prefill_v1 cd /vllm-workspace/vllm-ascend/examples/disaggregated_prefill_v1
python toy_proxy_server.py --host 172.19.32.175 --port 1025 --prefiller-hosts 172.19.241.49 --prefiller-port 20002 --decoder-hosts 172.19.123.51 --decoder-ports 20002 python load_balance_proxy_server_example.py --host 172.19.32.175 --port 1025 --prefiller-hosts 172.19.241.49 --prefiller-port 20002 --decoder-hosts 172.19.123.51 --decoder-ports 20002
``` ```
Verification Verification

View File

@@ -21,6 +21,10 @@ parser.add_argument("--local-device-ids",
type=str, type=str,
required=False, required=False,
help="local device ids") help="local device ids")
parser.add_argument("--ranktable-path",
type=str,
default="./ranktable.json",
help="output rank table path")
args = parser.parse_args() args = parser.parse_args()
local_host = args.local_host local_host = args.local_host
prefill_device_cnt = args.prefill_device_cnt prefill_device_cnt = args.prefill_device_cnt
@@ -130,7 +134,8 @@ ranktable = {
} }
if local_rank == '0': if local_rank == '0':
with open("ranktable.json", "w") as f: os.makedirs(os.path.dirname(args.ranktable_path), exist_ok=True)
with open(args.ranktable_path, "w") as f:
json.dump(ranktable, f, indent=4) json.dump(ranktable, f, indent=4)
print("gen ranktable.json done") print("gen ranktable.json done")

View File

@@ -21,6 +21,7 @@ import contextlib
import gc import gc
import json import json
import os import os
import shlex
import subprocess import subprocess
import sys import sys
import time import time
@@ -40,14 +41,11 @@ from transformers import (AutoConfig, AutoModelForCausalLM, AutoTokenizer,
from transformers.models.auto.auto_factory import _BaseAutoModelClass from transformers.models.auto.auto_factory import _BaseAutoModelClass
from vllm import LLM, SamplingParams from vllm import LLM, SamplingParams
from vllm.config.model import TaskOption, _get_and_verify_dtype from vllm.config.model import TaskOption, _get_and_verify_dtype
from vllm.engine.arg_utils import AsyncEngineArgs
from vllm.entrypoints.cli.serve import ServeSubcommand
from vllm.inputs import TextPrompt from vllm.inputs import TextPrompt
from vllm.model_executor.model_loader import get_model_loader
from vllm.outputs import RequestOutput from vllm.outputs import RequestOutput
from vllm.platforms import current_platform from vllm.platforms import current_platform
from vllm.transformers_utils.utils import maybe_model_redirect from vllm.transformers_utils.utils import maybe_model_redirect
from vllm.utils import FlexibleArgumentParser, get_open_port from vllm.utils import get_open_port
from tests.e2e.model_utils import (TokensTextLogprobs, from tests.e2e.model_utils import (TokensTextLogprobs,
TokensTextLogprobsPromptLogprobs) TokensTextLogprobsPromptLogprobs)
@@ -91,7 +89,7 @@ def cleanup_dist_env_and_memory(shutdown_ray: bool = False):
class RemoteOpenAIServer: class RemoteOpenAIServer:
DUMMY_API_KEY = "token-abc123" # vLLM's OpenAI server does not need API key DUMMY_API_KEY = "token-abc123" # vLLM's OpenAI server does not need API key
def _start_server(self, model: str, vllm_serve_args: list[str], def _start_server(self, model: str, server_cmd: list[str],
env_dict: Optional[dict[str, str]]) -> None: env_dict: Optional[dict[str, str]]) -> None:
"""Subclasses override this method to customize server process launch """Subclasses override this method to customize server process launch
""" """
@@ -102,7 +100,7 @@ class RemoteOpenAIServer:
if env_dict is not None: if env_dict is not None:
env.update(env_dict) env.update(env_dict)
self.proc: subprocess.Popen = subprocess.Popen( self.proc: subprocess.Popen = subprocess.Popen(
["vllm", "serve", model, *vllm_serve_args], server_cmd,
env=env, env=env,
stdout=sys.stdout, stdout=sys.stdout,
stderr=sys.stderr, stderr=sys.stderr,
@@ -110,15 +108,19 @@ class RemoteOpenAIServer:
def __init__(self, def __init__(self,
model: str, model: str,
vllm_serve_args: list[str], vllm_serve_args: Union[list[str], str],
*, *,
server_host: str = "0.0.0.0", server_host: str = "0.0.0.0",
server_port: int = 8080, server_port: int = 8080,
env_dict: Optional[dict[str, str]] = None, env_dict: Optional[dict[str, str]] = None,
seed: Optional[int] = 0, seed: Optional[int] = None,
auto_port: bool = True, auto_port: bool = True,
max_wait_seconds: Optional[float] = None, max_wait_seconds: Optional[float] = None,
override_hf_configs: Optional[dict[str, Any]] = None) -> None: override_hf_configs: Optional[dict[str, Any]] = None) -> None:
if isinstance(vllm_serve_args, str):
vllm_serve_args = shlex.split(vllm_serve_args)
else:
vllm_serve_args = ["vllm", "serve", model, *vllm_serve_args]
if auto_port: if auto_port:
if "-p" in vllm_serve_args or "--port" in vllm_serve_args: if "-p" in vllm_serve_args or "--port" in vllm_serve_args:
raise ValueError("You have manually specified the port " raise ValueError("You have manually specified the port "
@@ -142,32 +144,8 @@ class RemoteOpenAIServer:
"--hf-overrides", "--hf-overrides",
json.dumps(override_hf_configs) json.dumps(override_hf_configs)
] ]
self.host = str(server_host)
parser = FlexibleArgumentParser( self.port = int(server_port)
description="vLLM's remote OpenAI server.")
subparsers = parser.add_subparsers(required=False, dest="subparser")
parser = ServeSubcommand().subparser_init(subparsers)
args = parser.parse_args([*vllm_serve_args])
self.uds = args.uds
if args.uds:
self.host = None
self.port = None
else:
self.host = str(server_host)
self.port = int(server_port)
self.show_hidden_metrics = \
args.show_hidden_metrics_for_version is not None
# download the model before starting the server to avoid timeout
is_local = os.path.isdir(model)
if not is_local:
engine_args = AsyncEngineArgs.from_cli_args(args)
model_config = engine_args.create_model_config()
load_config = engine_args.create_load_config()
model_loader = get_model_loader(load_config)
model_loader.download_model(model_config)
self._start_server(model, vllm_serve_args, env_dict) self._start_server(model, vllm_serve_args, env_dict)
max_wait_seconds = max_wait_seconds or 7200 max_wait_seconds = max_wait_seconds or 7200
@@ -195,11 +173,7 @@ class RemoteOpenAIServer:
This is for headless mode, where the api server This is for headless mode, where the api server
process only exists in the leader node. process only exists in the leader node.
""" """
if self.uds: client = requests
client = httpx.Client(transport=httpx.HTTPTransport(uds=self.uds))
else:
client = requests
try: try:
while True: while True:
try: try:
@@ -216,8 +190,7 @@ class RemoteOpenAIServer:
def _wait_for_server(self, *, url: str, timeout: float): def _wait_for_server(self, *, url: str, timeout: float):
# run health check # run health check
start = time.time() start = time.time()
client = (httpx.Client(transport=httpx.HTTPTransport( client = requests
uds=self.uds)) if self.uds else requests)
while True: while True:
try: try:
if client.get(url).status_code == 200: if client.get(url).status_code == 200:
@@ -231,15 +204,14 @@ class RemoteOpenAIServer:
if result is not None and result != 0: if result is not None and result != 0:
raise RuntimeError("Server exited unexpectedly.") from None raise RuntimeError("Server exited unexpectedly.") from None
time.sleep(1) time.sleep(5)
if time.time() - start > timeout: if time.time() - start > timeout:
raise RuntimeError( raise RuntimeError(
"Server failed to start in time.") from None "Server failed to start in time.") from None
@property @property
def url_root(self) -> str: def url_root(self) -> str:
return (f"http://{self.uds.split('/')[-1]}" return f"http://{self.host}:{self.port}"
if self.uds else f"http://{self.host}:{self.port}")
def url_for(self, *parts: str) -> str: def url_for(self, *parts: str) -> str:
return self.url_root + "/" + "/".join(parts) return self.url_root + "/" + "/".join(parts)

View File

@@ -1,43 +0,0 @@
[
{
"test_name": "test_deepseek_v3",
"disaggregate_prefill": false,
"enable_multithread_load": false,
"num_nodes": 2,
"server_parameters": {
"leader_config": {
"model": "vllm-ascend/DeepSeek-V3-W8A8",
"quantization": "ascend",
"additional_config": {
"ascend_scheduler_config": {
"enabled": true
},
"torchair_graph_config": {
"enabled": true
}
}
},
"worker_config": {
"model": "vllm-ascend/DeepSeek-V3-W8A8",
"quantization": "ascend",
"additional_config": {
"ascend_scheduler_config": {
"enabled": true
},
"torchair_graph_config": {
"enabled": true
}
}
}
},
"client_parameters": {
"model": "vllm-ascend/DeepSeek-V3-W8A8",
"backend": "vllm",
"dataset_name": "sharegpt",
"dataset_path": "/root/.cache/datasets/ShareGPT_V3_unfiltered_cleaned_split.json",
"num_prompts": 200,
"request_rate": 1
},
"accuracy_parameters": {}
}
]

View File

@@ -1,204 +0,0 @@
import json
import logging
import os
from dataclasses import dataclass, field, fields
from pathlib import Path
from typing import Any, Dict, List, Optional, Type, TypeVar, Union
from tests.e2e.multi_node.config.utils import (get_avaliable_port,
get_leader_ip,
get_net_interface)
LOG = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
CONFIG_PATH = Path("tests/e2e/multi_node/config/config.json")
T = TypeVar("T", bound="BaseConfig")
# =========================
# Base Config
# =========================
@dataclass
class BaseConfig:
model: str = "vllm-ascend/DeepSeek-V3-W8A8"
_extra_fields: Optional[Dict[str, Any]] = None
@classmethod
def from_config(cls: Type[T], data: dict[str, Any]) -> T:
"""Create config instance from dict, keeping unknown fields."""
field_names = {f.name for f in fields(cls)}
valid_fields = {k: v for k, v in data.items() if k in field_names}
extra_fields = {k: v for k, v in data.items() if k not in field_names}
obj = cls(**valid_fields)
obj._extra_fields = extra_fields or {}
return obj
def to_list(self) -> List[str]:
"""Convert all fields (including _extra_fields) to CLI arguments."""
args: List[str] = []
all_items = {**vars(self), **(self._extra_fields or {})}
for key, value in all_items.items():
if key in ("model", "_extra_fields") or value in (None, "", [],
{}):
continue
key = key.replace("_", "-")
if isinstance(value, bool):
if value:
args.append(f"--{key}")
elif isinstance(value, dict):
args += [f"--{key}", json.dumps(value, ensure_ascii=False)]
else:
args += [f"--{key}", str(value)]
return args
# =========================
# Server Config
# =========================
@dataclass
class ServerConfig(BaseConfig):
host: str = "0.0.0.0"
port: int = 8080
trust_remote_code: bool = True
enable_expert_parallel: bool = True
gpu_memory_utilization: float = 0.9
headless: bool = False
quantization: Optional[str] = None
tensor_parallel_size: int = 8
max_model_len: int = 8192
max_num_batched_token: int = 8192
data_parallel_size: int = 4
data_parallel_size_local: int = 2
data_parallel_start_rank: int = 0
data_parallel_rpc_port: int = 13389
data_parallel_address: Optional[str] = None
kv_transfer_config: Optional[Dict[str, Any]] = None
additional_config: Optional[Dict[str, Any]] = None
def init_dp_param(
self,
is_leader: bool,
is_disaggregate_prefill: bool,
dp_size: int,
world_size: int,
) -> None:
"""Initialize distributed parallel parameters."""
iface = get_net_interface()
if iface is None:
raise RuntimeError("No available network interface found")
self.data_parallel_address = iface[0]
if is_disaggregate_prefill:
self.data_parallel_start_rank = 0
return
if not is_leader:
self.headless = True
self.data_parallel_start_rank = dp_size // world_size
self.data_parallel_address = get_leader_ip()
@dataclass
class PerfConfig(BaseConfig):
pass
@dataclass
class AccuracyConfig:
prompt: str
expected_output: str
# =========================
# MultiNode Config
# =========================
@dataclass
class MultiNodeConfig:
test_name: str = "Unnamed Test"
disaggregate_prefill: bool = False
enable_multithread_load: bool = True
world_size: int = 2
server_host: str = "0.0.0.0"
server_port: int = 8888
server_config: ServerConfig = field(default_factory=ServerConfig)
perf_config: Optional[PerfConfig] = None
accuracy_config: Optional[AccuracyConfig] = None
@classmethod
def from_config(cls, cfg: Dict[str, Any]) -> "MultiNodeConfig":
"""Create a MultiNodeConfig from raw dict."""
num_nodes = cfg.get("num_nodes", 2)
is_disaggregate_prefill = cfg.get("disaggregate_prefill", False)
node_index = int(os.getenv("LWS_WORKER_INDEX", 0))
is_leader = node_index == 0
# server config
server_cfg_data = cfg.get("server_parameters", {})
if not server_cfg_data:
raise ValueError("Missing required key: 'server_parameters'")
role_key = "leader_config" if is_leader else "worker_config"
server_cfg_dict = server_cfg_data.get(role_key, {})
server_cfg: ServerConfig = ServerConfig.from_config(server_cfg_dict)
if cfg.get("enable_multithread_load"):
server_cfg.model_loader_extra_config = { # type: ignore[attr-defined]
"enable_multithread_load": True,
"num_threads": 8,
}
# distributed param init
server_cfg.init_dp_param(
is_leader=is_leader,
is_disaggregate_prefill=is_disaggregate_prefill,
dp_size=server_cfg.data_parallel_size,
world_size=num_nodes,
)
perf_cfg: Optional[PerfConfig] = (PerfConfig.from_config(
cfg.get("client_parameters", {})) if cfg.get("client_parameters")
else None)
# network info
leader_cfg = server_cfg_data.get("leader_config", {})
server_host = get_leader_ip()
server_port = (get_avaliable_port() if is_disaggregate_prefill else
leader_cfg.get("port", 8080))
return cls(
test_name=str(cfg.get("test_name", "Unnamed Test")),
disaggregate_prefill=is_disaggregate_prefill,
enable_multithread_load=cfg.get("enable_multithread_load", False),
world_size=num_nodes,
server_config=server_cfg,
perf_config=perf_cfg,
server_host=server_host,
server_port=server_port,
)
# =========================
# Loader
# =========================
def load_configs(
path: Union[str, Path] = CONFIG_PATH) -> List[MultiNodeConfig]:
"""Load one or multiple configs from JSON file."""
path = Path(path)
if not path.exists():
raise FileNotFoundError(f"Configuration file not found: {path}")
raw = json.loads(path.read_text())
configs_data = raw if isinstance(raw, list) else [raw]
configs = []
for idx, item in enumerate(configs_data):
try:
configs.append(MultiNodeConfig.from_config(item))
except Exception as e:
LOG.exception(f"Failed to parse config #{idx}: {e}")
raise
return configs

View File

@@ -1,68 +0,0 @@
import os
import socket
import subprocess
from typing import Optional, Tuple
import psutil
def get_leader_ip():
leader_dns = os.getenv("LWS_LEADER_ADDRESS")
assert leader_dns is not None, "cannot find leader address"
return socket.gethostbyname(leader_dns)
def get_avaliable_port(start_port: int = 6000, end_port: int = 7000) -> int:
import socket
for port in range(start_port, end_port):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
try:
s.bind(("", port))
return port
except OSError:
continue
raise RuntimeError("No available port found")
def get_net_interface(ip: Optional[str] = None) -> Optional[Tuple[str, str]]:
"""
Returns specified IP and its network interface.
If no IP is provided, uses the first from hostname -I.
"""
if ip is None:
ips = subprocess.check_output(["hostname",
"-I"]).decode().strip().split()
if not ips:
return None
ip = ips[0]
for iface, addrs in psutil.net_if_addrs().items():
for addr in addrs:
if addr.family == socket.AF_INET and addr.address == ip:
return ip, iface
return None
def get_default_envs() -> dict[str, str]:
"""Returns default network and system environment variables."""
result = get_net_interface()
if result is None:
raise RuntimeError("Failed to get default network IP and interface")
ip, nic_name = result
return {
"HCCL_IF_IP": ip,
"GLOO_SOCKET_IFNAME": nic_name,
"TP_SOCKET_IFNAME": nic_name,
"HCCL_SOCKET_IFNAME": nic_name,
"OMP_PROC_BIND": "false",
"OMP_NUM_THREADS": "100",
"VLLM_USE_V1": "1",
"HCCL_BUFFSIZE": "1024",
"VLLM_USE_MODELSCOPE": "true",
"NUMEXPR_MAX_THREADS": "100",
}
def generate_ranktable():
pass

View File

@@ -1,49 +0,0 @@
import subprocess
import pytest
from tests.e2e.conftest import RemoteOpenAIServer
from tests.e2e.multi_node.config.multi_node_config import (MultiNodeConfig,
load_configs)
from tests.e2e.multi_node.config.utils import get_default_envs
configs = load_configs()
def get_benchmark_cmd(model: str, base_url: str, args: list[str]):
"""vllm bench serve <model> --base-url <url> ..."""
return [
"vllm", "bench", "serve", "--model", model, "--base-url", base_url
] + args
@pytest.mark.parametrize("config", configs)
def test_multi_dp(config: MultiNodeConfig) -> None:
env_dict = get_default_envs()
server_config = config.server_config
perf_config = config.perf_config
model_name = server_config.model
assert model_name is not None, "Model name must be specified"
server_args = server_config.to_list()
with RemoteOpenAIServer(
model_name,
server_args,
server_host=config.server_host,
server_port=config.server_port,
env_dict=env_dict,
auto_port=False,
seed=1024,
max_wait_seconds=1000,
) as remote_server:
base_url = remote_server.url_root
assert perf_config is not None, "Perf config must be specified for perf tests"
perf_cmd = get_benchmark_cmd(server_config.model, base_url,
perf_config.to_list())
if server_config.headless:
remote_server.hang_until_terminated()
else:
# run perf benchmark
subprocess.run(perf_cmd, check=True)

View File

@@ -0,0 +1,126 @@
# For disaggregated mode, set is_disaggregated: true, and set the following parameters:
# Prefiller_index: the hosts index of the node running prefiller
# Decoder_index: the hosts index of the node running decoder
# Suppose we have **4 nodes** running a 2P1D setup (2 Prefillers + 1 Decoder):
# ┌───────────────┬───────────────┬───────────────┬───────────────┐
# │ node0 │ node1 │ node2 │ node3 │
# │ Prefiller #1 │ Prefiller #2 │ Decoder │ Decoder │
# └───────────────┴───────────────┴───────────────┴───────────────┘
# For the prefiller nodes. the hosts should be node0 and node1
# For the decoder nodes. we only have 1 decoder node(dp+tp+ep across node2 and node3. Where node3 is running with headless mode)
# So the prefiller_host_index is [0, 1], and the decoder_host_index is [2]
test_name: "test DeepSeek-V3 disaggregated_prefill"
model: "vllm-ascend/DeepSeek-V3-W8A8"
num_nodes: 2
npu_per_node: 16
env_common:
VLLM_USE_MODELSCOPE: true
OMP_PROC_BIND: false
OMP_NUM_THREADS: 100
HCCL_BUFFSIZE: 1024
SERVER_PORT: 8080
disaggregated_prefill:
enabled: true
prefiller_host_index: [0]
decoder_host_index: [1]
deployment:
-
local_index: 0
master_index: 0
headless: false
env_extend:
server_cmd: >
vllm serve "vllm-ascend/DeepSeek-V3-W8A8"
--host 0.0.0.0
--port $SERVER_PORT
--data-parallel-size 2
--data-parallel-size-local 2
--tensor-parallel-size 8
--seed 1024
--enforce-eager
--enable-expert-parallel
--max-num-seqs 16
--max-model-len 8192
--max-num-batched-tokens 8192
--quantization ascend
--trust-remote-code
--no-enable-prefix-caching
--gpu-memory-utilization 0.9
--kv-transfer-config
'{"kv_connector": "MooncakeConnector",
"kv_role": "kv_producer",
"kv_port": "30000",
"engine_id": "0",
"kv_connector_module_path": "vllm_ascend.distributed.mooncake_connector",
"kv_connector_extra_config": {
"prefill": {
"dp_size": 2,
"tp_size": 8
},
"decode": {
"dp_size": 2,
"tp_size": 8
}
}
}'
-
local_index: 1
master_index: 0
headless: true
env_extend:
server_cmd: >
vllm serve "vllm-ascend/DeepSeek-V3-W8A8"
--host 0.0.0.0
--port $SERVER_PORT
--data-parallel-size 2
--data-parallel-size-local 2
--tensor-parallel-size 8
--seed 1024
--quantization ascend
--max-num-seqs 16
--max-model-len 8192
--max-num-batched-tokens 8192
--enable-expert-parallel
--trust-remote-code
--no-enable-prefix-caching
--gpu-memory-utilization 0.9
--additional-config '{"torchair_graph_config":{"enabled":true}}'
--kv-transfer-config
'{"kv_connector": "MooncakeConnector",
"kv_role": "kv_consumer",
"kv_port": "30200",
"engine_id": "1",
"kv_connector_module_path": "vllm_ascend.distributed.mooncake_connector",
"kv_connector_extra_config": {
"prefill": {
"dp_size": 2,
"tp_size": 8
},
"decode": {
"dp_size": 2,
"tp_size": 8
}
}
}'
benchmarks:
perf:
case_type: performance
dataset_path: vllm-ascend/GSM8K-in3500-bs400
request_conf: vllm_api_stream_chat
dataset_conf: gsm8k/gsm8k_gen_0_shot_cot_str_perf
num_prompts: 1
max_out_len: 2
batch_size: 1
baseline: 5
threshold: 0.97
acc:
case_type: accuracy
dataset_path: vllm-ascend/AIME2024
request_conf: vllm_api_general_chat
dataset_conf: aime2024/aime2024_gen_0_shot_chat_prompt
max_out_len: 10
batch_size: 32
baseline: 1
threshold: 1

View File

@@ -0,0 +1,76 @@
test_name: "test Qwen3-235B-A22B multi-dp"
model: "Qwen/Qwen3-235B-A22B"
num_nodes: 2
npu_per_node: 16
env_common:
VLLM_USE_MODELSCOPE: true
OMP_PROC_BIND: false
OMP_NUM_THREADS: 100
HCCL_BUFFSIZE: 1024
SERVER_PORT: 8080
deployment:
-
local_index: 0
master_index: 0
headless: false
env_extend:
server_cmd: >
vllm serve "Qwen/Qwen3-235B-A22B"
--host 0.0.0.0
--port $SERVER_PORT
--data-parallel-size 4
--data-parallel-size-local 2
--data-parallel-address $LOCAL_IP
--data-parallel-rpc-port 13389
--tensor-parallel-size 8
--seed 1024
--enable-expert-parallel
--max-num-seqs 16
--max-model-len 8192
--max-num-batched-tokens 8192
--trust-remote-code
--no-enable-prefix-caching
--gpu-memory-utilization 0.9
-
local_index: 1
master_index: 0
headless: true
env_extend:
server_cmd: >
vllm serve "Qwen/Qwen3-235B-A22B"
--headless
--data-parallel-size 4
--data-parallel-size-local 2
--data-parallel-start-rank 2
--data-parallel-address $MASTER_IP
--data-parallel-rpc-port 13389
--tensor-parallel-size 8
--seed 1024
--max-num-seqs 16
--max-model-len 8192
--max-num-batched-tokens 8192
--enable-expert-parallel
--trust-remote-code
--no-enable-prefix-caching
--gpu-memory-utilization 0.9
benchmarks:
perf:
case_type: performance
dataset_path: vllm-ascend/GSM8K-in3500-bs400
request_conf: vllm_api_stream_chat
dataset_conf: gsm8k/gsm8k_gen_0_shot_cot_str_perf
num_prompts: 1
max_out_len: 2
batch_size: 1
baseline: 5
threshold: 0.97
acc:
case_type: accuracy
dataset_path: vllm-ascend/AIME2024
request_conf: vllm_api_general_chat
dataset_conf: aime2024/aime2024_gen_0_shot_chat_prompt
max_out_len: 10
batch_size: 32
baseline: 1
threshold: 1

View File

@@ -0,0 +1,207 @@
import logging
import os
import subprocess
from typing import Optional
import regex as re
import yaml
from tests.e2e.nightly.multi_node.config.utils import (get_avaliable_port,
get_cluster_ips,
get_cur_ip,
get_net_interface,
setup_logger)
setup_logger()
logger = logging.getLogger(__name__)
DISAGGREGATED_PREFILL_PROXY_SCRIPT = "examples/disaggregated_prefill_v1/load_balance_proxy_layerwise_server_example.py"
class MultiNodeConfig:
def __init__(self,
model: str,
test_name: str,
num_nodes: int = 2,
npu_per_node: int = 16,
server_port: int = 8080,
headless: bool = False,
disaggregated_prefill: Optional[dict] = None,
envs: Optional[dict] = None,
server_cmd: str = "",
perf_cmd: Optional[str] = None,
acc_cmd: Optional[str] = None):
self.test_name = test_name
self.model = model
self.num_nodes = num_nodes
self.npu_per_node = npu_per_node
self.envs = envs if envs is not None else {}
self.server_port = server_port
if disaggregated_prefill:
self.proxy_port = get_avaliable_port()
self.headless = headless
self.server_cmd = server_cmd
self.perf_cmd = perf_cmd
self.acc_cmd = acc_cmd
assert perf_cmd is not None, "perf_cmd must be provided"
assert acc_cmd is not None, "acc_cmd must be provided"
assert server_cmd is not None, "server_cmd must be provided"
self.cur_index = os.getenv("LWS_WORKER_INDEX", 0)
self.cur_ip = get_cur_ip()
self.nic_name = get_net_interface(self.cur_ip)
self.cluster_ips = get_cluster_ips(num_nodes)
self.disaggregated_prefill = disaggregated_prefill
self._init_dist_env()
self.server_cmd = self._expand_env_vars(self.server_cmd, self.envs)
def _init_dist_env(self):
self.envs["HCCL_IF_IP"] = self.cur_ip
self.envs["GLOO_SOCKET_IFNAME"] = self.nic_name
self.envs["TP_SOCKET_IFNAME"] = self.nic_name
self.envs["HCCL_SOCKET_IFNAME"] = self.nic_name
self.envs["LOCAL_IP"] = self.cur_ip
self.envs["NIC_NAME"] = self.nic_name
self.envs["MASTER_IP"] = self.cluster_ips[0]
ascend_path = "/usr/local/Ascend/ascend-toolkit/latest/python/site-packages"
self.envs[
"LD_LIBRARY_PATH"] = f"{ascend_path}:{self.envs.get('LD_LIBRARY_PATH', os.environ.get('LD_LIBRARY_PATH', ''))}"
# keep the envs keys and values as strings
str_envs = {k: str(v) for k, v in self.envs.items()}
self.envs.clear()
self.envs.update(str_envs)
@staticmethod
def _expand_env_vars(cmd: str, env: dict) -> str:
"""Expand environment variables in the command string."""
cmd = str(cmd)
pattern = re.compile(r"\$(\w+)|\$\{(\w+)\}")
def replace_var(match):
var_name = match.group(1) or match.group(2)
return str(env.get(var_name, match.group(0)))
return pattern.sub(replace_var, cmd)
class _ProxyContext:
def __init__(self, outer, proxy_script):
self.outer = outer
self.proxy_script = proxy_script
self.process = None
def __enter__(self):
o = self.outer
if not o.disaggregated_prefill or not o.is_master:
logger.info(
"Disaggregated prefill not enabled or not master node, skipping proxy launch."
)
return self
prefiller_indices = o.disaggregated_prefill["prefiller_host_index"]
decoder_indices = o.disaggregated_prefill["decoder_host_index"]
common_indices = set(prefiller_indices) & set(decoder_indices)
assert not common_indices, f"Common indices found: {common_indices}"
assert o.proxy_port is not None, "proxy_port must be set"
prefiller_ips = [o.cluster_ips[i] for i in prefiller_indices]
decoder_ips = [o.cluster_ips[i] for i in decoder_indices]
prefiller_ports_list = [str(o.server_port)] * len(prefiller_ips)
decoder_ports_list = [str(o.server_port)] * len(decoder_ips)
proxy_cmd = [
"python",
self.proxy_script,
"--host",
o.cur_ip,
"--port",
str(o.proxy_port),
"--prefiller-hosts",
*prefiller_ips,
"--prefiller-ports",
*prefiller_ports_list,
"--decoder-hosts",
*decoder_ips,
"--decoder-ports",
*decoder_ports_list,
]
env = os.environ.copy()
env.update(o.envs)
logger.info(f"Launching proxy: {' '.join(proxy_cmd)}")
self.process = subprocess.Popen(proxy_cmd, env=env)
o.proxy_process = self.process
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self.process:
logger.info("Terminating proxy server process...")
try:
self.process.terminate()
self.process.wait(timeout=5)
except subprocess.TimeoutExpired:
logger.warning(
"Proxy process did not terminate, killing it...")
self.process.kill()
logger.info("Proxy server process terminated.")
def launch_server_proxy(self, proxy_script: str):
"""Return a context manager that launches the proxy server if disaggregated prefill is enabled."""
return self._ProxyContext(self, proxy_script)
@classmethod
def from_yaml(cls, yaml_path: Optional[str] = None):
if not yaml_path:
yaml_path = os.getenv(
"CONFIG_YAML_PATH",
"tests/e2e/nightly/multi_node/config/models/DeepSeek-V3.yaml")
with open(yaml_path, 'r') as file:
config_data = yaml.safe_load(file)
test_name = config_data.get("test_name", "default_test")
model = config_data.get("model", "default_model")
envs = config_data.get("env_common", {})
num_nodes = config_data.get("num_nodes", 2)
npu_per_node = config_data.get("npu_per_node", 16)
disaggregated_prefill = config_data.get("disaggregated_prefill")
# If disaggregated_prefill is set, override server_port to an available port for proxy running
server_port = config_data.get("server_port", 8080)
deployments = config_data.get("deployment", [])
assert len(deployments) == num_nodes, \
f"Number of deployments ({len(deployments)}) must match num_nodes ({num_nodes})"
for deployment in deployments:
if deployment.get("local_index") == int(
os.getenv("LWS_WORKER_INDEX", 0)):
envs_extend = deployment.get("env_extend", {})
if envs_extend:
envs.update(envs_extend)
server_cmd = deployment.get("server_cmd")
headless = deployment.get("headless", False)
break
benchmarks = config_data.get("benchmarks", {})
assert benchmarks is not None, "benchmarks must be provided"
perf_cmd = benchmarks["perf"]
acc_cmd = benchmarks["acc"]
return cls(model=model,
test_name=test_name,
num_nodes=num_nodes,
npu_per_node=npu_per_node,
envs=envs,
server_port=server_port,
headless=headless,
disaggregated_prefill=disaggregated_prefill,
server_cmd=server_cmd,
perf_cmd=perf_cmd,
acc_cmd=acc_cmd)
@property
def world_size(self):
return self.num_nodes * self.npu_per_node
@property
def is_master(self):
return int(self.cur_index) == 0

View File

@@ -0,0 +1,95 @@
import logging
import os
import socket
from contextlib import contextmanager
from typing import Optional
import psutil
# import torch.distributed as dist
@contextmanager
def temp_env(env_dict):
old_env = {}
for k, v in env_dict.items():
old_env[k] = os.environ.get(k)
os.environ[k] = str(v)
try:
yield
finally:
for k, v in old_env.items():
if v is None:
os.environ.pop(k, None)
else:
os.environ[k] = v
# @contextmanager
# def dist_group(backend="gloo"):
# if dist.is_initialized():
# yield
# return
# dist.init_process_group(backend=backend)
# try:
# yield
# finally:
# dist.destroy_process_group()
def get_cluster_ips(word_size: int = 2) -> list[str]:
"""
Returns the IP addresses of all nodes in the cluster.
0: leader
1~N-1: workers
"""
leader_dns = os.getenv("LWS_LEADER_ADDRESS")
if not leader_dns:
raise RuntimeError("LWS_LEADER_ADDRESS is not set")
cluster_dns = [leader_dns]
for i in range(1, word_size):
cur_dns = f"vllm-0-{i}.vllm.vllm-project"
cluster_dns.append(cur_dns)
return [socket.gethostbyname(dns) for dns in cluster_dns]
def get_avaliable_port(start_port: int = 6000, end_port: int = 7000) -> int:
import socket
for port in range(start_port, end_port):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
try:
s.bind(("", port))
return port
except OSError:
continue
raise RuntimeError("No available port found")
def get_cur_ip() -> str:
"""Returns the current machine's IP address."""
return socket.gethostbyname_ex(socket.gethostname())[2][0]
def get_net_interface(ip: Optional[str] = None) -> Optional[str]:
"""
Returns specified IP's inetwork interface.
If no IP is provided, uses the first from hostname -I.
"""
if ip is None:
ip = get_cur_ip()
for iface, addrs in psutil.net_if_addrs().items():
for addr in addrs:
if addr.family == socket.AF_INET and addr.address == ip:
return iface
return None
def setup_logger():
"""Setup logging configuration."""
logging.basicConfig(
level=logging.INFO,
format="[%(asctime)s] [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)

View File

@@ -17,14 +17,8 @@ spec:
- name: vllm-leader - name: vllm-leader
image: m.daocloud.io/quay.io/ascend/cann:8.2.rc1-a3-ubuntu22.04-py3.11 image: m.daocloud.io/quay.io/ascend/cann:8.2.rc1-a3-ubuntu22.04-py3.11
env: env:
- name: VLLM_USE_MODELSCOPE
value: "true"
- name: WORKSPACE - name: WORKSPACE
value: "/root/workspace" value: "/root/workspace"
- name: WORLD_SIZE
value: "2"
- name: NPU_PER_NODE
value: "16"
# Set vLLM version and vLLM-Ascend version here, once there is a new release, update here. # Set vLLM version and vLLM-Ascend version here, once there is a new release, update here.
- name: VLLM_VERSION - name: VLLM_VERSION
value: "v0.11.0" value: "v0.11.0"
@@ -37,6 +31,7 @@ spec:
- -c - -c
- | - |
bash /root/.cache/tests/run.sh bash /root/.cache/tests/run.sh
tail -f /dev/null
resources: resources:
limits: limits:
huawei.com/ascend-1980: "16" huawei.com/ascend-1980: "16"
@@ -77,14 +72,8 @@ spec:
- name: vllm-worker - name: vllm-worker
image: m.daocloud.io/quay.io/ascend/cann:8.2.rc1-a3-ubuntu22.04-py3.11 image: m.daocloud.io/quay.io/ascend/cann:8.2.rc1-a3-ubuntu22.04-py3.11
env: env:
- name: VLLM_USE_MODELSCOPE
value: "true"
- name: WORKSPACE - name: WORKSPACE
value: "/root/workspace" value: "/root/workspace"
- name: WORLD_SIZE
value: "2"
- name: NPU_PER_NODE
value: "16"
# Set vLLM version and vLLM-Ascend version here, once there is a new release, update here. # Set vLLM version and vLLM-Ascend version here, once there is a new release, update here.
- name: VLLM_VERSION - name: VLLM_VERSION
value: "v0.11.0" value: "v0.11.0"
@@ -97,6 +86,7 @@ spec:
- -c - -c
- | - |
bash /root/.cache/tests/run.sh bash /root/.cache/tests/run.sh
tail -f /dev/null
resources: resources:
limits: limits:
huawei.com/ascend-1980: "16" huawei.com/ascend-1980: "16"

View File

@@ -32,10 +32,7 @@ checkout_src() {
#mooncake #mooncake
if [ ! -d "$SRC_DIR/Mooncake" ]; then if [ ! -d "$SRC_DIR/Mooncake" ]; then
git clone https://github.com/kvcache-ai/Mooncake.git "$SRC_DIR/Mooncake" git clone -b pooling_async_memecpy_v1 https://github.com/AscendTransport/Mooncake "$SRC_DIR/Mooncake"
cd "$SRC_DIR/Mooncake"
git checkout 06cc217504a6f1b0cdaa26b096b985651b262748
cd -
fi fi
} }
@@ -62,25 +59,77 @@ install_vllm() {
install_mooncake() { install_mooncake() {
echo "====> Install mooncake" echo "====> Install mooncake"
apt-get update apt-get update -y
apt install -y --allow-change-held-packages python3 python-is-python3
apt-get install -y --no-install-recommends mpich libmpich-dev apt-get install -y --no-install-recommends mpich libmpich-dev
cd $SRC_DIR/Mooncake cd $SRC_DIR/Mooncake
sed -i '/option(USE_ASCEND_DIRECT)/s/OFF/ON/' mooncake-common/common.cmake
bash dependencies.sh --yes bash dependencies.sh --yes
apt purge mpich libmpich-dev -y
apt purge openmpi-bin -y
apt purge openmpi-bin libopenmpi-dev -y
apt install mpich libmpich-dev -y
export CPATH=/usr/lib/aarch64-linux-gnu/mpich/include/:$CPATH
export CPATH=/usr/lib/aarch64-linux-gnu/openmpi/lib:$CPATH
mkdir build mkdir build
cd - cd -
cd $SRC_DIR/Mooncake/build cd $SRC_DIR/Mooncake/build
cmake .. cmake ..
make -j make -j
make install make install
cp mooncake-transfer-engine/src/transport/ascend_transport/hccl_transport/ascend_transport_c/libascend_transport_mem.so /usr/local/Ascend/ascend-toolkit/latest/python/site-packages/
cp mooncake-transfer-engine/src/libtransfer_engine.so /usr/local/Ascend/ascend-toolkit/latest/python/site-packages/
cd - cd -
} }
kill_npu_processes() {
pgrep python3 | xargs -r kill -9
pgrep VLLM | xargs -r kill -9
sleep 4
}
run_tests() { run_tests() {
echo "====> Run tests" echo "====> Run tests"
cd "$SRC_DIR/vllm-ascend"
pytest -sv tests/e2e/multi_node/test_multi_dp.py shopt -s nullglob
declare -A results
local total=0
local passed=0
local failed=0
local REPORT_FILE="/root/.cache/test_summary.md"
echo "#Nightly Multi-node Test Summary" > "$REPORT_FILE"
echo "" >> "$REPORT_FILE"
echo "| Config File | Result |" >> "$REPORT_FILE"
echo "|--------------|---------|" >> "$REPORT_FILE"
for file in tests/e2e/nightly/multi_node/config/models/*.yaml; do
export CONFIG_YAML_PATH="$file"
echo "Running test with config: $CONFIG_YAML_PATH"
if pytest -sv tests/e2e/nightly/multi_node/test_multi_node.py; then
results["$file"]="✅ PASS"
((passed++))
else
results["$file"]="❌ FAIL"
((failed++))
fi
((total++))
echo "| \`$file\` | ${results[$file]} |" >> "$REPORT_FILE"
echo "------------------------------------------"
kill_npu_processes
done
shopt -u nullglob
echo "" >> "$REPORT_FILE"
echo "## Summary" >> "$REPORT_FILE"
echo "- **Total:** $total" >> "$REPORT_FILE"
echo "- **Passed:** $passed" >> "$REPORT_FILE"
echo "- **Failed:** $failed" >> "$REPORT_FILE"
echo
echo "✅ Markdown report written to: $REPORT_FILE"
} }
main() { main() {
@@ -89,7 +138,7 @@ main() {
checkout_src checkout_src
install_sys_dependencies install_sys_dependencies
install_vllm install_vllm
#install_mooncake install_mooncake
run_tests run_tests
} }

View File

@@ -0,0 +1,30 @@
from tests.e2e.conftest import RemoteOpenAIServer
from tests.e2e.nightly.multi_node.config.multi_node_config import (
DISAGGREGATED_PREFILL_PROXY_SCRIPT, MultiNodeConfig)
def test_multi_node() -> None:
config = MultiNodeConfig.from_yaml()
env_dict = config.envs
# perf_cmd = config.perf_cmd
# acc_cmd = config.acc_cmd
server_port = config.server_port if not config.disaggregated_prefill else config.proxy_port
server_host = config.cluster_ips[0]
with config.launch_server_proxy(DISAGGREGATED_PREFILL_PROXY_SCRIPT):
with RemoteOpenAIServer(
model=config.model,
vllm_serve_args=config.server_cmd,
server_port=server_port,
server_host=server_host,
env_dict=env_dict,
auto_port=False,
max_wait_seconds=2000,
) as remote_server:
# base_url = remote_server.url_root
if config.is_master:
pass
# TODO: enable perf and acc test
# subprocess.run(perf_cmd, check=True)
# subprocess.run(acc_cmd, check=True)
else:
remote_server.hang_until_terminated()