diff --git a/.github/workflows/multi_node_test.yaml b/.github/workflows/multi_node_test.yaml index 1d262a2..682ae90 100644 --- a/.github/workflows/multi_node_test.yaml +++ b/.github/workflows/multi_node_test.yaml @@ -102,6 +102,15 @@ jobs: wait $LOG_PID || true kill $MONITOR_PID || true + - name: Generate summary + if: always() + run: | + if [ -f "/root/.cache/test_summary.md" ]; then + cat /root/.cache/test_summary.md >> "$GITHUB_STEP_SUMMARY" + else + echo "No summary file found." >> "$GITHUB_STEP_SUMMARY" + fi + - name: Post process if: always() run: | diff --git a/docs/source/tutorials/multi_node_pd_disaggregation_mooncake.md b/docs/source/tutorials/multi_node_pd_disaggregation_mooncake.md index 81c930b..49c175a 100644 --- a/docs/source/tutorials/multi_node_pd_disaggregation_mooncake.md +++ b/docs/source/tutorials/multi_node_pd_disaggregation_mooncake.md @@ -66,16 +66,16 @@ Install the relevant dependencies. The installation of Go is not required. ```shell cd Mooncake -bash dependencies.sh +bash dependencies.sh -y ``` Install mpi ```shell -apt purge mpich libmpich-dev -apt purge openmpi-bin -apt purge openmpi-bin libopenmpi-dev -apt install mpich libmpich-dev +apt purge mpich libmpich-dev -y +apt purge openmpi-bin -y +apt purge openmpi-bin libopenmpi-dev -y +apt install mpich libmpich-dev -y export CPATH=/usr/lib/aarch64-linux-gnu/mpich/include/:$CPATH export CPATH=/usr/lib/aarch64-linux-gnu/openmpi/lib:$CPATH ``` diff --git a/examples/disaggregated_prefill_v1/README.md b/examples/disaggregated_prefill_v1/README.md index fabcf6b..7a546a3 100644 --- a/examples/disaggregated_prefill_v1/README.md +++ b/examples/disaggregated_prefill_v1/README.md @@ -205,7 +205,7 @@ vllm serve /models/deepseek_r1_w8a8 \ Run proxy server on the first node: ```shell cd /vllm-workspace/vllm-ascend/examples/disaggregated_prefill_v1 -python toy_proxy_server.py --host 172.19.32.175 --port 1025 --prefiller-hosts 172.19.241.49 --prefiller-port 20002 --decoder-hosts 172.19.123.51 --decoder-ports 20002 +python load_balance_proxy_server_example.py --host 172.19.32.175 --port 1025 --prefiller-hosts 172.19.241.49 --prefiller-port 20002 --decoder-hosts 172.19.123.51 --decoder-ports 20002 ``` Verification diff --git a/examples/disaggregated_prefill_v1/gen_ranktable.py b/examples/disaggregated_prefill_v1/gen_ranktable.py index ad86c84..98ce9f5 100644 --- a/examples/disaggregated_prefill_v1/gen_ranktable.py +++ b/examples/disaggregated_prefill_v1/gen_ranktable.py @@ -21,6 +21,10 @@ parser.add_argument("--local-device-ids", type=str, required=False, help="local device ids") +parser.add_argument("--ranktable-path", + type=str, + default="./ranktable.json", + help="output rank table path") args = parser.parse_args() local_host = args.local_host prefill_device_cnt = args.prefill_device_cnt @@ -130,7 +134,8 @@ ranktable = { } if local_rank == '0': - with open("ranktable.json", "w") as f: + os.makedirs(os.path.dirname(args.ranktable_path), exist_ok=True) + with open(args.ranktable_path, "w") as f: json.dump(ranktable, f, indent=4) print("gen ranktable.json done") diff --git a/tests/e2e/conftest.py b/tests/e2e/conftest.py index fa8d2e7..452faa1 100644 --- a/tests/e2e/conftest.py +++ b/tests/e2e/conftest.py @@ -21,6 +21,7 @@ import contextlib import gc import json import os +import shlex import subprocess import sys import time @@ -40,14 +41,11 @@ from transformers import (AutoConfig, AutoModelForCausalLM, AutoTokenizer, from transformers.models.auto.auto_factory import _BaseAutoModelClass from vllm import LLM, SamplingParams from vllm.config.model import TaskOption, _get_and_verify_dtype -from vllm.engine.arg_utils import AsyncEngineArgs -from vllm.entrypoints.cli.serve import ServeSubcommand from vllm.inputs import TextPrompt -from vllm.model_executor.model_loader import get_model_loader from vllm.outputs import RequestOutput from vllm.platforms import current_platform from vllm.transformers_utils.utils import maybe_model_redirect -from vllm.utils import FlexibleArgumentParser, get_open_port +from vllm.utils import get_open_port from tests.e2e.model_utils import (TokensTextLogprobs, TokensTextLogprobsPromptLogprobs) @@ -91,7 +89,7 @@ def cleanup_dist_env_and_memory(shutdown_ray: bool = False): class RemoteOpenAIServer: DUMMY_API_KEY = "token-abc123" # vLLM's OpenAI server does not need API key - def _start_server(self, model: str, vllm_serve_args: list[str], + def _start_server(self, model: str, server_cmd: list[str], env_dict: Optional[dict[str, str]]) -> None: """Subclasses override this method to customize server process launch """ @@ -102,7 +100,7 @@ class RemoteOpenAIServer: if env_dict is not None: env.update(env_dict) self.proc: subprocess.Popen = subprocess.Popen( - ["vllm", "serve", model, *vllm_serve_args], + server_cmd, env=env, stdout=sys.stdout, stderr=sys.stderr, @@ -110,15 +108,19 @@ class RemoteOpenAIServer: def __init__(self, model: str, - vllm_serve_args: list[str], + vllm_serve_args: Union[list[str], str], *, server_host: str = "0.0.0.0", server_port: int = 8080, env_dict: Optional[dict[str, str]] = None, - seed: Optional[int] = 0, + seed: Optional[int] = None, auto_port: bool = True, max_wait_seconds: Optional[float] = None, override_hf_configs: Optional[dict[str, Any]] = None) -> None: + if isinstance(vllm_serve_args, str): + vllm_serve_args = shlex.split(vllm_serve_args) + else: + vllm_serve_args = ["vllm", "serve", model, *vllm_serve_args] if auto_port: if "-p" in vllm_serve_args or "--port" in vllm_serve_args: raise ValueError("You have manually specified the port " @@ -142,32 +144,8 @@ class RemoteOpenAIServer: "--hf-overrides", json.dumps(override_hf_configs) ] - - parser = FlexibleArgumentParser( - description="vLLM's remote OpenAI server.") - subparsers = parser.add_subparsers(required=False, dest="subparser") - parser = ServeSubcommand().subparser_init(subparsers) - args = parser.parse_args([*vllm_serve_args]) - self.uds = args.uds - if args.uds: - self.host = None - self.port = None - else: - self.host = str(server_host) - self.port = int(server_port) - - self.show_hidden_metrics = \ - args.show_hidden_metrics_for_version is not None - - # download the model before starting the server to avoid timeout - is_local = os.path.isdir(model) - if not is_local: - engine_args = AsyncEngineArgs.from_cli_args(args) - model_config = engine_args.create_model_config() - load_config = engine_args.create_load_config() - - model_loader = get_model_loader(load_config) - model_loader.download_model(model_config) + self.host = str(server_host) + self.port = int(server_port) self._start_server(model, vllm_serve_args, env_dict) max_wait_seconds = max_wait_seconds or 7200 @@ -195,11 +173,7 @@ class RemoteOpenAIServer: This is for headless mode, where the api server process only exists in the leader node. """ - if self.uds: - client = httpx.Client(transport=httpx.HTTPTransport(uds=self.uds)) - else: - client = requests - + client = requests try: while True: try: @@ -216,8 +190,7 @@ class RemoteOpenAIServer: def _wait_for_server(self, *, url: str, timeout: float): # run health check start = time.time() - client = (httpx.Client(transport=httpx.HTTPTransport( - uds=self.uds)) if self.uds else requests) + client = requests while True: try: if client.get(url).status_code == 200: @@ -231,15 +204,14 @@ class RemoteOpenAIServer: if result is not None and result != 0: raise RuntimeError("Server exited unexpectedly.") from None - time.sleep(1) + time.sleep(5) if time.time() - start > timeout: raise RuntimeError( "Server failed to start in time.") from None @property def url_root(self) -> str: - return (f"http://{self.uds.split('/')[-1]}" - if self.uds else f"http://{self.host}:{self.port}") + return f"http://{self.host}:{self.port}" def url_for(self, *parts: str) -> str: return self.url_root + "/" + "/".join(parts) diff --git a/tests/e2e/multi_node/config/config.json b/tests/e2e/multi_node/config/config.json deleted file mode 100644 index 2954881..0000000 --- a/tests/e2e/multi_node/config/config.json +++ /dev/null @@ -1,43 +0,0 @@ -[ - { - "test_name": "test_deepseek_v3", - "disaggregate_prefill": false, - "enable_multithread_load": false, - "num_nodes": 2, - "server_parameters": { - "leader_config": { - "model": "vllm-ascend/DeepSeek-V3-W8A8", - "quantization": "ascend", - "additional_config": { - "ascend_scheduler_config": { - "enabled": true - }, - "torchair_graph_config": { - "enabled": true - } - } - }, - "worker_config": { - "model": "vllm-ascend/DeepSeek-V3-W8A8", - "quantization": "ascend", - "additional_config": { - "ascend_scheduler_config": { - "enabled": true - }, - "torchair_graph_config": { - "enabled": true - } - } - } - }, - "client_parameters": { - "model": "vllm-ascend/DeepSeek-V3-W8A8", - "backend": "vllm", - "dataset_name": "sharegpt", - "dataset_path": "/root/.cache/datasets/ShareGPT_V3_unfiltered_cleaned_split.json", - "num_prompts": 200, - "request_rate": 1 - }, - "accuracy_parameters": {} - } -] diff --git a/tests/e2e/multi_node/config/multi_node_config.py b/tests/e2e/multi_node/config/multi_node_config.py deleted file mode 100644 index 2a67752..0000000 --- a/tests/e2e/multi_node/config/multi_node_config.py +++ /dev/null @@ -1,204 +0,0 @@ -import json -import logging -import os -from dataclasses import dataclass, field, fields -from pathlib import Path -from typing import Any, Dict, List, Optional, Type, TypeVar, Union - -from tests.e2e.multi_node.config.utils import (get_avaliable_port, - get_leader_ip, - get_net_interface) - -LOG = logging.getLogger(__name__) -logging.basicConfig(level=logging.INFO) - -CONFIG_PATH = Path("tests/e2e/multi_node/config/config.json") - -T = TypeVar("T", bound="BaseConfig") - - -# ========================= -# Base Config -# ========================= -@dataclass -class BaseConfig: - model: str = "vllm-ascend/DeepSeek-V3-W8A8" - _extra_fields: Optional[Dict[str, Any]] = None - - @classmethod - def from_config(cls: Type[T], data: dict[str, Any]) -> T: - """Create config instance from dict, keeping unknown fields.""" - field_names = {f.name for f in fields(cls)} - valid_fields = {k: v for k, v in data.items() if k in field_names} - extra_fields = {k: v for k, v in data.items() if k not in field_names} - obj = cls(**valid_fields) - obj._extra_fields = extra_fields or {} - return obj - - def to_list(self) -> List[str]: - """Convert all fields (including _extra_fields) to CLI arguments.""" - args: List[str] = [] - all_items = {**vars(self), **(self._extra_fields or {})} - - for key, value in all_items.items(): - if key in ("model", "_extra_fields") or value in (None, "", [], - {}): - continue - key = key.replace("_", "-") - - if isinstance(value, bool): - if value: - args.append(f"--{key}") - elif isinstance(value, dict): - args += [f"--{key}", json.dumps(value, ensure_ascii=False)] - else: - args += [f"--{key}", str(value)] - return args - - -# ========================= -# Server Config -# ========================= -@dataclass -class ServerConfig(BaseConfig): - host: str = "0.0.0.0" - port: int = 8080 - trust_remote_code: bool = True - enable_expert_parallel: bool = True - gpu_memory_utilization: float = 0.9 - headless: bool = False - quantization: Optional[str] = None - tensor_parallel_size: int = 8 - max_model_len: int = 8192 - max_num_batched_token: int = 8192 - data_parallel_size: int = 4 - data_parallel_size_local: int = 2 - data_parallel_start_rank: int = 0 - data_parallel_rpc_port: int = 13389 - data_parallel_address: Optional[str] = None - kv_transfer_config: Optional[Dict[str, Any]] = None - additional_config: Optional[Dict[str, Any]] = None - - def init_dp_param( - self, - is_leader: bool, - is_disaggregate_prefill: bool, - dp_size: int, - world_size: int, - ) -> None: - """Initialize distributed parallel parameters.""" - iface = get_net_interface() - if iface is None: - raise RuntimeError("No available network interface found") - self.data_parallel_address = iface[0] - - if is_disaggregate_prefill: - self.data_parallel_start_rank = 0 - return - - if not is_leader: - self.headless = True - self.data_parallel_start_rank = dp_size // world_size - self.data_parallel_address = get_leader_ip() - - -@dataclass -class PerfConfig(BaseConfig): - pass - - -@dataclass -class AccuracyConfig: - prompt: str - expected_output: str - - -# ========================= -# MultiNode Config -# ========================= -@dataclass -class MultiNodeConfig: - test_name: str = "Unnamed Test" - disaggregate_prefill: bool = False - enable_multithread_load: bool = True - world_size: int = 2 - server_host: str = "0.0.0.0" - server_port: int = 8888 - server_config: ServerConfig = field(default_factory=ServerConfig) - perf_config: Optional[PerfConfig] = None - accuracy_config: Optional[AccuracyConfig] = None - - @classmethod - def from_config(cls, cfg: Dict[str, Any]) -> "MultiNodeConfig": - """Create a MultiNodeConfig from raw dict.""" - num_nodes = cfg.get("num_nodes", 2) - is_disaggregate_prefill = cfg.get("disaggregate_prefill", False) - node_index = int(os.getenv("LWS_WORKER_INDEX", 0)) - is_leader = node_index == 0 - - # server config - server_cfg_data = cfg.get("server_parameters", {}) - if not server_cfg_data: - raise ValueError("Missing required key: 'server_parameters'") - - role_key = "leader_config" if is_leader else "worker_config" - server_cfg_dict = server_cfg_data.get(role_key, {}) - server_cfg: ServerConfig = ServerConfig.from_config(server_cfg_dict) - - if cfg.get("enable_multithread_load"): - server_cfg.model_loader_extra_config = { # type: ignore[attr-defined] - "enable_multithread_load": True, - "num_threads": 8, - } - - # distributed param init - server_cfg.init_dp_param( - is_leader=is_leader, - is_disaggregate_prefill=is_disaggregate_prefill, - dp_size=server_cfg.data_parallel_size, - world_size=num_nodes, - ) - - perf_cfg: Optional[PerfConfig] = (PerfConfig.from_config( - cfg.get("client_parameters", {})) if cfg.get("client_parameters") - else None) - - # network info - leader_cfg = server_cfg_data.get("leader_config", {}) - server_host = get_leader_ip() - server_port = (get_avaliable_port() if is_disaggregate_prefill else - leader_cfg.get("port", 8080)) - - return cls( - test_name=str(cfg.get("test_name", "Unnamed Test")), - disaggregate_prefill=is_disaggregate_prefill, - enable_multithread_load=cfg.get("enable_multithread_load", False), - world_size=num_nodes, - server_config=server_cfg, - perf_config=perf_cfg, - server_host=server_host, - server_port=server_port, - ) - - -# ========================= -# Loader -# ========================= -def load_configs( - path: Union[str, Path] = CONFIG_PATH) -> List[MultiNodeConfig]: - """Load one or multiple configs from JSON file.""" - path = Path(path) - if not path.exists(): - raise FileNotFoundError(f"Configuration file not found: {path}") - - raw = json.loads(path.read_text()) - configs_data = raw if isinstance(raw, list) else [raw] - - configs = [] - for idx, item in enumerate(configs_data): - try: - configs.append(MultiNodeConfig.from_config(item)) - except Exception as e: - LOG.exception(f"Failed to parse config #{idx}: {e}") - raise - return configs diff --git a/tests/e2e/multi_node/config/utils.py b/tests/e2e/multi_node/config/utils.py deleted file mode 100644 index 2e29589..0000000 --- a/tests/e2e/multi_node/config/utils.py +++ /dev/null @@ -1,68 +0,0 @@ -import os -import socket -import subprocess -from typing import Optional, Tuple - -import psutil - - -def get_leader_ip(): - leader_dns = os.getenv("LWS_LEADER_ADDRESS") - assert leader_dns is not None, "cannot find leader address" - return socket.gethostbyname(leader_dns) - - -def get_avaliable_port(start_port: int = 6000, end_port: int = 7000) -> int: - import socket - for port in range(start_port, end_port): - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - try: - s.bind(("", port)) - return port - except OSError: - continue - raise RuntimeError("No available port found") - - -def get_net_interface(ip: Optional[str] = None) -> Optional[Tuple[str, str]]: - """ - Returns specified IP and its network interface. - If no IP is provided, uses the first from hostname -I. - """ - if ip is None: - ips = subprocess.check_output(["hostname", - "-I"]).decode().strip().split() - if not ips: - return None - ip = ips[0] - - for iface, addrs in psutil.net_if_addrs().items(): - for addr in addrs: - if addr.family == socket.AF_INET and addr.address == ip: - return ip, iface - return None - - -def get_default_envs() -> dict[str, str]: - """Returns default network and system environment variables.""" - result = get_net_interface() - if result is None: - raise RuntimeError("Failed to get default network IP and interface") - ip, nic_name = result - - return { - "HCCL_IF_IP": ip, - "GLOO_SOCKET_IFNAME": nic_name, - "TP_SOCKET_IFNAME": nic_name, - "HCCL_SOCKET_IFNAME": nic_name, - "OMP_PROC_BIND": "false", - "OMP_NUM_THREADS": "100", - "VLLM_USE_V1": "1", - "HCCL_BUFFSIZE": "1024", - "VLLM_USE_MODELSCOPE": "true", - "NUMEXPR_MAX_THREADS": "100", - } - - -def generate_ranktable(): - pass diff --git a/tests/e2e/multi_node/test_multi_dp.py b/tests/e2e/multi_node/test_multi_dp.py deleted file mode 100644 index d834da2..0000000 --- a/tests/e2e/multi_node/test_multi_dp.py +++ /dev/null @@ -1,49 +0,0 @@ -import subprocess - -import pytest - -from tests.e2e.conftest import RemoteOpenAIServer -from tests.e2e.multi_node.config.multi_node_config import (MultiNodeConfig, - load_configs) -from tests.e2e.multi_node.config.utils import get_default_envs - -configs = load_configs() - - -def get_benchmark_cmd(model: str, base_url: str, args: list[str]): - """vllm bench serve --base-url ...""" - return [ - "vllm", "bench", "serve", "--model", model, "--base-url", base_url - ] + args - - -@pytest.mark.parametrize("config", configs) -def test_multi_dp(config: MultiNodeConfig) -> None: - env_dict = get_default_envs() - - server_config = config.server_config - perf_config = config.perf_config - model_name = server_config.model - assert model_name is not None, "Model name must be specified" - - server_args = server_config.to_list() - - with RemoteOpenAIServer( - model_name, - server_args, - server_host=config.server_host, - server_port=config.server_port, - env_dict=env_dict, - auto_port=False, - seed=1024, - max_wait_seconds=1000, - ) as remote_server: - base_url = remote_server.url_root - assert perf_config is not None, "Perf config must be specified for perf tests" - perf_cmd = get_benchmark_cmd(server_config.model, base_url, - perf_config.to_list()) - if server_config.headless: - remote_server.hang_until_terminated() - else: - # run perf benchmark - subprocess.run(perf_cmd, check=True) diff --git a/tests/e2e/multi_node/__init__.py b/tests/e2e/nightly/multi_node/__init__.py similarity index 100% rename from tests/e2e/multi_node/__init__.py rename to tests/e2e/nightly/multi_node/__init__.py diff --git a/tests/e2e/multi_node/config/__init__.py b/tests/e2e/nightly/multi_node/config/__init__.py similarity index 100% rename from tests/e2e/multi_node/config/__init__.py rename to tests/e2e/nightly/multi_node/config/__init__.py diff --git a/tests/e2e/nightly/multi_node/config/models/DeepSeek-V3.yaml b/tests/e2e/nightly/multi_node/config/models/DeepSeek-V3.yaml new file mode 100644 index 0000000..2d29223 --- /dev/null +++ b/tests/e2e/nightly/multi_node/config/models/DeepSeek-V3.yaml @@ -0,0 +1,126 @@ +# For disaggregated mode, set is_disaggregated: true, and set the following parameters: +# Prefiller_index: the hosts index of the node running prefiller +# Decoder_index: the hosts index of the node running decoder +# Suppose we have **4 nodes** running a 2P1D setup (2 Prefillers + 1 Decoder): +# ┌───────────────┬───────────────┬───────────────┬───────────────┐ +# │ node0 │ node1 │ node2 │ node3 │ +# │ Prefiller #1 │ Prefiller #2 │ Decoder │ Decoder │ +# └───────────────┴───────────────┴───────────────┴───────────────┘ +# For the prefiller nodes. the hosts should be node0 and node1 +# For the decoder nodes. we only have 1 decoder node(dp+tp+ep across node2 and node3. Where node3 is running with headless mode) +# So the prefiller_host_index is [0, 1], and the decoder_host_index is [2] +test_name: "test DeepSeek-V3 disaggregated_prefill" +model: "vllm-ascend/DeepSeek-V3-W8A8" +num_nodes: 2 +npu_per_node: 16 +env_common: + VLLM_USE_MODELSCOPE: true + OMP_PROC_BIND: false + OMP_NUM_THREADS: 100 + HCCL_BUFFSIZE: 1024 + SERVER_PORT: 8080 +disaggregated_prefill: + enabled: true + prefiller_host_index: [0] + decoder_host_index: [1] + +deployment: + - + local_index: 0 + master_index: 0 + headless: false + env_extend: + server_cmd: > + vllm serve "vllm-ascend/DeepSeek-V3-W8A8" + --host 0.0.0.0 + --port $SERVER_PORT + --data-parallel-size 2 + --data-parallel-size-local 2 + --tensor-parallel-size 8 + --seed 1024 + --enforce-eager + --enable-expert-parallel + --max-num-seqs 16 + --max-model-len 8192 + --max-num-batched-tokens 8192 + --quantization ascend + --trust-remote-code + --no-enable-prefix-caching + --gpu-memory-utilization 0.9 + --kv-transfer-config + '{"kv_connector": "MooncakeConnector", + "kv_role": "kv_producer", + "kv_port": "30000", + "engine_id": "0", + "kv_connector_module_path": "vllm_ascend.distributed.mooncake_connector", + "kv_connector_extra_config": { + "prefill": { + "dp_size": 2, + "tp_size": 8 + }, + "decode": { + "dp_size": 2, + "tp_size": 8 + } + } + }' + + - + local_index: 1 + master_index: 0 + headless: true + env_extend: + server_cmd: > + vllm serve "vllm-ascend/DeepSeek-V3-W8A8" + --host 0.0.0.0 + --port $SERVER_PORT + --data-parallel-size 2 + --data-parallel-size-local 2 + --tensor-parallel-size 8 + --seed 1024 + --quantization ascend + --max-num-seqs 16 + --max-model-len 8192 + --max-num-batched-tokens 8192 + --enable-expert-parallel + --trust-remote-code + --no-enable-prefix-caching + --gpu-memory-utilization 0.9 + --additional-config '{"torchair_graph_config":{"enabled":true}}' + --kv-transfer-config + '{"kv_connector": "MooncakeConnector", + "kv_role": "kv_consumer", + "kv_port": "30200", + "engine_id": "1", + "kv_connector_module_path": "vllm_ascend.distributed.mooncake_connector", + "kv_connector_extra_config": { + "prefill": { + "dp_size": 2, + "tp_size": 8 + }, + "decode": { + "dp_size": 2, + "tp_size": 8 + } + } + }' +benchmarks: + perf: + case_type: performance + dataset_path: vllm-ascend/GSM8K-in3500-bs400 + request_conf: vllm_api_stream_chat + dataset_conf: gsm8k/gsm8k_gen_0_shot_cot_str_perf + num_prompts: 1 + max_out_len: 2 + batch_size: 1 + baseline: 5 + threshold: 0.97 + acc: + case_type: accuracy + dataset_path: vllm-ascend/AIME2024 + request_conf: vllm_api_general_chat + dataset_conf: aime2024/aime2024_gen_0_shot_chat_prompt + max_out_len: 10 + batch_size: 32 + baseline: 1 + threshold: 1 diff --git a/tests/e2e/nightly/multi_node/config/models/Qwen3-235B-A3B.yaml b/tests/e2e/nightly/multi_node/config/models/Qwen3-235B-A3B.yaml new file mode 100644 index 0000000..d2b2095 --- /dev/null +++ b/tests/e2e/nightly/multi_node/config/models/Qwen3-235B-A3B.yaml @@ -0,0 +1,76 @@ +test_name: "test Qwen3-235B-A22B multi-dp" +model: "Qwen/Qwen3-235B-A22B" +num_nodes: 2 +npu_per_node: 16 +env_common: + VLLM_USE_MODELSCOPE: true + OMP_PROC_BIND: false + OMP_NUM_THREADS: 100 + HCCL_BUFFSIZE: 1024 + SERVER_PORT: 8080 + +deployment: + - + local_index: 0 + master_index: 0 + headless: false + env_extend: + server_cmd: > + vllm serve "Qwen/Qwen3-235B-A22B" + --host 0.0.0.0 + --port $SERVER_PORT + --data-parallel-size 4 + --data-parallel-size-local 2 + --data-parallel-address $LOCAL_IP + --data-parallel-rpc-port 13389 + --tensor-parallel-size 8 + --seed 1024 + --enable-expert-parallel + --max-num-seqs 16 + --max-model-len 8192 + --max-num-batched-tokens 8192 + --trust-remote-code + --no-enable-prefix-caching + --gpu-memory-utilization 0.9 + - + local_index: 1 + master_index: 0 + headless: true + env_extend: + server_cmd: > + vllm serve "Qwen/Qwen3-235B-A22B" + --headless + --data-parallel-size 4 + --data-parallel-size-local 2 + --data-parallel-start-rank 2 + --data-parallel-address $MASTER_IP + --data-parallel-rpc-port 13389 + --tensor-parallel-size 8 + --seed 1024 + --max-num-seqs 16 + --max-model-len 8192 + --max-num-batched-tokens 8192 + --enable-expert-parallel + --trust-remote-code + --no-enable-prefix-caching + --gpu-memory-utilization 0.9 +benchmarks: + perf: + case_type: performance + dataset_path: vllm-ascend/GSM8K-in3500-bs400 + request_conf: vllm_api_stream_chat + dataset_conf: gsm8k/gsm8k_gen_0_shot_cot_str_perf + num_prompts: 1 + max_out_len: 2 + batch_size: 1 + baseline: 5 + threshold: 0.97 + acc: + case_type: accuracy + dataset_path: vllm-ascend/AIME2024 + request_conf: vllm_api_general_chat + dataset_conf: aime2024/aime2024_gen_0_shot_chat_prompt + max_out_len: 10 + batch_size: 32 + baseline: 1 + threshold: 1 diff --git a/tests/e2e/nightly/multi_node/config/multi_node_config.py b/tests/e2e/nightly/multi_node/config/multi_node_config.py new file mode 100644 index 0000000..2106b82 --- /dev/null +++ b/tests/e2e/nightly/multi_node/config/multi_node_config.py @@ -0,0 +1,207 @@ +import logging +import os +import subprocess +from typing import Optional + +import regex as re +import yaml + +from tests.e2e.nightly.multi_node.config.utils import (get_avaliable_port, + get_cluster_ips, + get_cur_ip, + get_net_interface, + setup_logger) + +setup_logger() +logger = logging.getLogger(__name__) +DISAGGREGATED_PREFILL_PROXY_SCRIPT = "examples/disaggregated_prefill_v1/load_balance_proxy_layerwise_server_example.py" + + +class MultiNodeConfig: + + def __init__(self, + model: str, + test_name: str, + num_nodes: int = 2, + npu_per_node: int = 16, + server_port: int = 8080, + headless: bool = False, + disaggregated_prefill: Optional[dict] = None, + envs: Optional[dict] = None, + server_cmd: str = "", + perf_cmd: Optional[str] = None, + acc_cmd: Optional[str] = None): + self.test_name = test_name + self.model = model + self.num_nodes = num_nodes + self.npu_per_node = npu_per_node + self.envs = envs if envs is not None else {} + self.server_port = server_port + if disaggregated_prefill: + self.proxy_port = get_avaliable_port() + self.headless = headless + self.server_cmd = server_cmd + self.perf_cmd = perf_cmd + self.acc_cmd = acc_cmd + assert perf_cmd is not None, "perf_cmd must be provided" + assert acc_cmd is not None, "acc_cmd must be provided" + assert server_cmd is not None, "server_cmd must be provided" + + self.cur_index = os.getenv("LWS_WORKER_INDEX", 0) + self.cur_ip = get_cur_ip() + self.nic_name = get_net_interface(self.cur_ip) + self.cluster_ips = get_cluster_ips(num_nodes) + self.disaggregated_prefill = disaggregated_prefill + self._init_dist_env() + self.server_cmd = self._expand_env_vars(self.server_cmd, self.envs) + + def _init_dist_env(self): + self.envs["HCCL_IF_IP"] = self.cur_ip + self.envs["GLOO_SOCKET_IFNAME"] = self.nic_name + self.envs["TP_SOCKET_IFNAME"] = self.nic_name + self.envs["HCCL_SOCKET_IFNAME"] = self.nic_name + self.envs["LOCAL_IP"] = self.cur_ip + self.envs["NIC_NAME"] = self.nic_name + self.envs["MASTER_IP"] = self.cluster_ips[0] + ascend_path = "/usr/local/Ascend/ascend-toolkit/latest/python/site-packages" + self.envs[ + "LD_LIBRARY_PATH"] = f"{ascend_path}:{self.envs.get('LD_LIBRARY_PATH', os.environ.get('LD_LIBRARY_PATH', ''))}" + + # keep the envs keys and values as strings + str_envs = {k: str(v) for k, v in self.envs.items()} + self.envs.clear() + self.envs.update(str_envs) + + @staticmethod + def _expand_env_vars(cmd: str, env: dict) -> str: + """Expand environment variables in the command string.""" + cmd = str(cmd) + pattern = re.compile(r"\$(\w+)|\$\{(\w+)\}") + + def replace_var(match): + var_name = match.group(1) or match.group(2) + return str(env.get(var_name, match.group(0))) + + return pattern.sub(replace_var, cmd) + + class _ProxyContext: + + def __init__(self, outer, proxy_script): + self.outer = outer + self.proxy_script = proxy_script + self.process = None + + def __enter__(self): + o = self.outer + if not o.disaggregated_prefill or not o.is_master: + logger.info( + "Disaggregated prefill not enabled or not master node, skipping proxy launch." + ) + return self + + prefiller_indices = o.disaggregated_prefill["prefiller_host_index"] + decoder_indices = o.disaggregated_prefill["decoder_host_index"] + + common_indices = set(prefiller_indices) & set(decoder_indices) + assert not common_indices, f"Common indices found: {common_indices}" + assert o.proxy_port is not None, "proxy_port must be set" + + prefiller_ips = [o.cluster_ips[i] for i in prefiller_indices] + decoder_ips = [o.cluster_ips[i] for i in decoder_indices] + prefiller_ports_list = [str(o.server_port)] * len(prefiller_ips) + decoder_ports_list = [str(o.server_port)] * len(decoder_ips) + + proxy_cmd = [ + "python", + self.proxy_script, + "--host", + o.cur_ip, + "--port", + str(o.proxy_port), + "--prefiller-hosts", + *prefiller_ips, + "--prefiller-ports", + *prefiller_ports_list, + "--decoder-hosts", + *decoder_ips, + "--decoder-ports", + *decoder_ports_list, + ] + + env = os.environ.copy() + env.update(o.envs) + logger.info(f"Launching proxy: {' '.join(proxy_cmd)}") + + self.process = subprocess.Popen(proxy_cmd, env=env) + o.proxy_process = self.process + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + if self.process: + logger.info("Terminating proxy server process...") + try: + self.process.terminate() + self.process.wait(timeout=5) + except subprocess.TimeoutExpired: + logger.warning( + "Proxy process did not terminate, killing it...") + self.process.kill() + logger.info("Proxy server process terminated.") + + def launch_server_proxy(self, proxy_script: str): + """Return a context manager that launches the proxy server if disaggregated prefill is enabled.""" + return self._ProxyContext(self, proxy_script) + + @classmethod + def from_yaml(cls, yaml_path: Optional[str] = None): + if not yaml_path: + yaml_path = os.getenv( + "CONFIG_YAML_PATH", + "tests/e2e/nightly/multi_node/config/models/DeepSeek-V3.yaml") + with open(yaml_path, 'r') as file: + config_data = yaml.safe_load(file) + test_name = config_data.get("test_name", "default_test") + model = config_data.get("model", "default_model") + envs = config_data.get("env_common", {}) + num_nodes = config_data.get("num_nodes", 2) + npu_per_node = config_data.get("npu_per_node", 16) + disaggregated_prefill = config_data.get("disaggregated_prefill") + # If disaggregated_prefill is set, override server_port to an available port for proxy running + server_port = config_data.get("server_port", 8080) + + deployments = config_data.get("deployment", []) + assert len(deployments) == num_nodes, \ + f"Number of deployments ({len(deployments)}) must match num_nodes ({num_nodes})" + for deployment in deployments: + if deployment.get("local_index") == int( + os.getenv("LWS_WORKER_INDEX", 0)): + envs_extend = deployment.get("env_extend", {}) + if envs_extend: + envs.update(envs_extend) + server_cmd = deployment.get("server_cmd") + headless = deployment.get("headless", False) + break + benchmarks = config_data.get("benchmarks", {}) + assert benchmarks is not None, "benchmarks must be provided" + perf_cmd = benchmarks["perf"] + acc_cmd = benchmarks["acc"] + + return cls(model=model, + test_name=test_name, + num_nodes=num_nodes, + npu_per_node=npu_per_node, + envs=envs, + server_port=server_port, + headless=headless, + disaggregated_prefill=disaggregated_prefill, + server_cmd=server_cmd, + perf_cmd=perf_cmd, + acc_cmd=acc_cmd) + + @property + def world_size(self): + return self.num_nodes * self.npu_per_node + + @property + def is_master(self): + return int(self.cur_index) == 0 diff --git a/tests/e2e/nightly/multi_node/config/utils.py b/tests/e2e/nightly/multi_node/config/utils.py new file mode 100644 index 0000000..da18af2 --- /dev/null +++ b/tests/e2e/nightly/multi_node/config/utils.py @@ -0,0 +1,95 @@ +import logging +import os +import socket +from contextlib import contextmanager +from typing import Optional + +import psutil + +# import torch.distributed as dist + + +@contextmanager +def temp_env(env_dict): + old_env = {} + for k, v in env_dict.items(): + old_env[k] = os.environ.get(k) + os.environ[k] = str(v) + try: + yield + finally: + for k, v in old_env.items(): + if v is None: + os.environ.pop(k, None) + else: + os.environ[k] = v + + +# @contextmanager +# def dist_group(backend="gloo"): +# if dist.is_initialized(): +# yield +# return + +# dist.init_process_group(backend=backend) +# try: +# yield +# finally: +# dist.destroy_process_group() + + +def get_cluster_ips(word_size: int = 2) -> list[str]: + """ + Returns the IP addresses of all nodes in the cluster. + 0: leader + 1~N-1: workers + """ + leader_dns = os.getenv("LWS_LEADER_ADDRESS") + if not leader_dns: + raise RuntimeError("LWS_LEADER_ADDRESS is not set") + cluster_dns = [leader_dns] + for i in range(1, word_size): + cur_dns = f"vllm-0-{i}.vllm.vllm-project" + cluster_dns.append(cur_dns) + return [socket.gethostbyname(dns) for dns in cluster_dns] + + +def get_avaliable_port(start_port: int = 6000, end_port: int = 7000) -> int: + import socket + for port in range(start_port, end_port): + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + try: + s.bind(("", port)) + return port + except OSError: + continue + raise RuntimeError("No available port found") + + +def get_cur_ip() -> str: + """Returns the current machine's IP address.""" + return socket.gethostbyname_ex(socket.gethostname())[2][0] + + +def get_net_interface(ip: Optional[str] = None) -> Optional[str]: + """ + Returns specified IP's inetwork interface. + If no IP is provided, uses the first from hostname -I. + """ + if ip is None: + ip = get_cur_ip() + + for iface, addrs in psutil.net_if_addrs().items(): + for addr in addrs: + if addr.family == socket.AF_INET and addr.address == ip: + return iface + return None + + +def setup_logger(): + """Setup logging configuration.""" + logging.basicConfig( + level=logging.INFO, + format="[%(asctime)s] [%(levelname)s] %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + ) diff --git a/tests/e2e/multi_node/scripts/lws.yaml b/tests/e2e/nightly/multi_node/scripts/lws.yaml similarity index 91% rename from tests/e2e/multi_node/scripts/lws.yaml rename to tests/e2e/nightly/multi_node/scripts/lws.yaml index 60d7178..163412a 100644 --- a/tests/e2e/multi_node/scripts/lws.yaml +++ b/tests/e2e/nightly/multi_node/scripts/lws.yaml @@ -17,14 +17,8 @@ spec: - name: vllm-leader image: m.daocloud.io/quay.io/ascend/cann:8.2.rc1-a3-ubuntu22.04-py3.11 env: - - name: VLLM_USE_MODELSCOPE - value: "true" - name: WORKSPACE value: "/root/workspace" - - name: WORLD_SIZE - value: "2" - - name: NPU_PER_NODE - value: "16" # Set vLLM version and vLLM-Ascend version here, once there is a new release, update here. - name: VLLM_VERSION value: "v0.11.0" @@ -37,6 +31,7 @@ spec: - -c - | bash /root/.cache/tests/run.sh + tail -f /dev/null resources: limits: huawei.com/ascend-1980: "16" @@ -77,14 +72,8 @@ spec: - name: vllm-worker image: m.daocloud.io/quay.io/ascend/cann:8.2.rc1-a3-ubuntu22.04-py3.11 env: - - name: VLLM_USE_MODELSCOPE - value: "true" - name: WORKSPACE value: "/root/workspace" - - name: WORLD_SIZE - value: "2" - - name: NPU_PER_NODE - value: "16" # Set vLLM version and vLLM-Ascend version here, once there is a new release, update here. - name: VLLM_VERSION value: "v0.11.0" @@ -97,6 +86,7 @@ spec: - -c - | bash /root/.cache/tests/run.sh + tail -f /dev/null resources: limits: huawei.com/ascend-1980: "16" diff --git a/tests/e2e/multi_node/scripts/run.sh b/tests/e2e/nightly/multi_node/scripts/run.sh similarity index 50% rename from tests/e2e/multi_node/scripts/run.sh rename to tests/e2e/nightly/multi_node/scripts/run.sh index e331cea..2cf2278 100644 --- a/tests/e2e/multi_node/scripts/run.sh +++ b/tests/e2e/nightly/multi_node/scripts/run.sh @@ -32,10 +32,7 @@ checkout_src() { #mooncake if [ ! -d "$SRC_DIR/Mooncake" ]; then - git clone https://github.com/kvcache-ai/Mooncake.git "$SRC_DIR/Mooncake" - cd "$SRC_DIR/Mooncake" - git checkout 06cc217504a6f1b0cdaa26b096b985651b262748 - cd - + git clone -b pooling_async_memecpy_v1 https://github.com/AscendTransport/Mooncake "$SRC_DIR/Mooncake" fi } @@ -62,25 +59,77 @@ install_vllm() { install_mooncake() { echo "====> Install mooncake" - apt-get update - apt install -y --allow-change-held-packages python3 python-is-python3 + apt-get update -y apt-get install -y --no-install-recommends mpich libmpich-dev cd $SRC_DIR/Mooncake - sed -i '/option(USE_ASCEND_DIRECT)/s/OFF/ON/' mooncake-common/common.cmake bash dependencies.sh --yes + apt purge mpich libmpich-dev -y + apt purge openmpi-bin -y + apt purge openmpi-bin libopenmpi-dev -y + apt install mpich libmpich-dev -y + export CPATH=/usr/lib/aarch64-linux-gnu/mpich/include/:$CPATH + export CPATH=/usr/lib/aarch64-linux-gnu/openmpi/lib:$CPATH + mkdir build cd - cd $SRC_DIR/Mooncake/build cmake .. make -j make install + cp mooncake-transfer-engine/src/transport/ascend_transport/hccl_transport/ascend_transport_c/libascend_transport_mem.so /usr/local/Ascend/ascend-toolkit/latest/python/site-packages/ + cp mooncake-transfer-engine/src/libtransfer_engine.so /usr/local/Ascend/ascend-toolkit/latest/python/site-packages/ cd - } +kill_npu_processes() { + pgrep python3 | xargs -r kill -9 + pgrep VLLM | xargs -r kill -9 + + sleep 4 +} + run_tests() { echo "====> Run tests" - cd "$SRC_DIR/vllm-ascend" - pytest -sv tests/e2e/multi_node/test_multi_dp.py + + shopt -s nullglob + declare -A results + local total=0 + local passed=0 + local failed=0 + + local REPORT_FILE="/root/.cache/test_summary.md" + echo "#Nightly Multi-node Test Summary" > "$REPORT_FILE" + echo "" >> "$REPORT_FILE" + echo "| Config File | Result |" >> "$REPORT_FILE" + echo "|--------------|---------|" >> "$REPORT_FILE" + + for file in tests/e2e/nightly/multi_node/config/models/*.yaml; do + export CONFIG_YAML_PATH="$file" + echo "Running test with config: $CONFIG_YAML_PATH" + + if pytest -sv tests/e2e/nightly/multi_node/test_multi_node.py; then + results["$file"]="✅ PASS" + ((passed++)) + else + results["$file"]="❌ FAIL" + ((failed++)) + fi + ((total++)) + + echo "| \`$file\` | ${results[$file]} |" >> "$REPORT_FILE" + echo "------------------------------------------" + kill_npu_processes + done + shopt -u nullglob + + echo "" >> "$REPORT_FILE" + echo "## Summary" >> "$REPORT_FILE" + echo "- **Total:** $total" >> "$REPORT_FILE" + echo "- **Passed:** $passed ✅" >> "$REPORT_FILE" + echo "- **Failed:** $failed ❌" >> "$REPORT_FILE" + + echo + echo "✅ Markdown report written to: $REPORT_FILE" } main() { @@ -89,7 +138,7 @@ main() { checkout_src install_sys_dependencies install_vllm - #install_mooncake + install_mooncake run_tests } diff --git a/tests/e2e/nightly/multi_node/test_multi_node.py b/tests/e2e/nightly/multi_node/test_multi_node.py new file mode 100644 index 0000000..c1e85c8 --- /dev/null +++ b/tests/e2e/nightly/multi_node/test_multi_node.py @@ -0,0 +1,30 @@ +from tests.e2e.conftest import RemoteOpenAIServer +from tests.e2e.nightly.multi_node.config.multi_node_config import ( + DISAGGREGATED_PREFILL_PROXY_SCRIPT, MultiNodeConfig) + + +def test_multi_node() -> None: + config = MultiNodeConfig.from_yaml() + env_dict = config.envs + # perf_cmd = config.perf_cmd + # acc_cmd = config.acc_cmd + server_port = config.server_port if not config.disaggregated_prefill else config.proxy_port + server_host = config.cluster_ips[0] + with config.launch_server_proxy(DISAGGREGATED_PREFILL_PROXY_SCRIPT): + with RemoteOpenAIServer( + model=config.model, + vllm_serve_args=config.server_cmd, + server_port=server_port, + server_host=server_host, + env_dict=env_dict, + auto_port=False, + max_wait_seconds=2000, + ) as remote_server: + # base_url = remote_server.url_root + if config.is_master: + pass + # TODO: enable perf and acc test + # subprocess.run(perf_cmd, check=True) + # subprocess.run(acc_cmd, check=True) + else: + remote_server.hang_until_terminated()