[Nightly][Refactor]Migrate nightly single-node model tests from .py to .yaml (#6503)

### What this PR does / why we need it?
This PR refactors the nightly single-node model test by migrating test
configurations from Python scripts to a more maintainable `YAML-based`
format.

| Original PR | Python (`.py`) | YAML (`.yaml`) |
| :--- | :--- | :--- |
| [#3568](https://github.com/vllm-project/vllm-ascend/pull/3568) |
`test_deepseek_r1_0528_w8a8_eplb.py` | `DeepSeek-R1-0528-W8A8.yaml` |
| [#3631](https://github.com/vllm-project/vllm-ascend/pull/3631) |
`test_deepseek_r1_0528_w8a8.py` | `DeepSeek-R1-0528-W8A8.yaml` |
| [#5874](https://github.com/vllm-project/vllm-ascend/pull/5874) |
`test_deepseek_r1_w8a8_hbm.py` | `DeepSeek-R1-W8A8-HBM.yaml` |
| [#3908](https://github.com/vllm-project/vllm-ascend/pull/3908) |
`test_deepseek_v3_2_w8a8.py` | `DeepSeek-V3.2-W8A8.yaml` |
| [#5682](https://github.com/vllm-project/vllm-ascend/pull/5682) |
`test_kimi_k2_thinking.py` | `Kimi-K2-Thinking.yaml` |
| [#4111](https://github.com/vllm-project/vllm-ascend/pull/4111) |
`test_mtpx_deepseek_r1_0528_w8a8.py` | `MTPX-DeepSeek-R1-0528-W8A8.yaml`
|
| [#3733](https://github.com/vllm-project/vllm-ascend/pull/3733) |
`test_prefix_cache_deepseek_r1_0528_w8a8.py` |
`Prefix-Cache-DeepSeek-R1-0528-W8A8.yaml` |
| [#6543](https://github.com/vllm-project/vllm-ascend/pull/6543) |
`test_qwen3_235b_w8a8.py` | `Qwen3-235B-A22B-W8A8.yaml` |
| [#6543](https://github.com/vllm-project/vllm-ascend/pull/6543) |
`test_qwen3_235b_a22b_w8a8_eplb.py` | `Qwen3-235B-A22B-W8A8.yaml` |
| [#3973](https://github.com/vllm-project/vllm-ascend/pull/3973) |
`test_qwen3_30b_w8a8.py` | `Qwen3-30B-A3B-W8A8.yaml` |
| [#3541](https://github.com/vllm-project/vllm-ascend/pull/3541) |
`test_qwen3_32b_int8.py` | `Qwen3-32B-Int8.yaml` |
| [#3757](https://github.com/vllm-project/vllm-ascend/pull/3757) |
`test_qwq_32b.py` | `QwQ-32B.yaml` |
| [#5616](https://github.com/vllm-project/vllm-ascend/pull/5616) |
`test_qwen3_next_w8a8.py` | `Qwen3-Next-80B-A3B-Instruct-W8A8.yaml` |
| [#3541](https://github.com/vllm-project/vllm-ascend/pull/3541) |
`test_qwen2_5_vl_7b.py` | `Qwen2.5-VL-7B-Instruct.yaml` |
| [#5301](https://github.com/vllm-project/vllm-ascend/pull/5301) |
`test_qwen2_5_vl_7b_epd.py` | `Qwen2.5-VL-7B-Instruct-EPD.yaml` |
| [#3707](https://github.com/vllm-project/vllm-ascend/pull/3707) |
`test_qwen2_5_vl_32b.py` | `Qwen2.5-VL-32B-Instruct.yaml` |
| [#3676](https://github.com/vllm-project/vllm-ascend/pull/3676) |
`test_qwen3_32b_int8_a3_feature_stack3.py` |
`Qwen3-32B-Int8-A3-Feature-Stack3.yaml` |
| [#3709](https://github.com/vllm-project/vllm-ascend/pull/3709) |
`test_prefix_cache_qwen3_32b_int8.py` |
`Prefix-Cache-Qwen3-32B-Int8.yaml` |
| [#5395](https://github.com/vllm-project/vllm-ascend/pull/5395) |
`test_qwen3_next.py` | `Qwen3-Next-80B-A3B-Instruct-A2.yaml` |
| [#3474](https://github.com/vllm-project/vllm-ascend/pull/3474) |
`test_qwen3_32b.py` | `Qwen3-32B.yaml` |
| [#3541](https://github.com/vllm-project/vllm-ascend/pull/3541) |
`test_qwen3_32b_int8.py` | `Qwen3-32B-Int8-A2.yaml` |
### Does this PR introduce _any_ user-facing change?

### How was this patch tested?

- vLLM version: v0.15.0
- vLLM main: https://github.com/vllm-project/vllm/commit/v0.15.0

---------

Signed-off-by: MrZ20 <2609716663@qq.com>
This commit is contained in:
SILONG ZENG
2026-03-03 20:13:43 +08:00
committed by GitHub
parent a0a904a3d4
commit 859f2c25b9
51 changed files with 2265 additions and 2336 deletions

View File

@@ -0,0 +1,312 @@
# vLLM-Ascend Single-Node E2E Test Developer Guide
This document is intended to help developers understand the architecture of the single-node E2E (End-to-End) testing framework in `vllm-ascend`, how to run test scripts, and how to add custom testing functionality by writing YAML configuration files and extending the code.
## 1. Test Architecture Overview
To achieve high readability, extensibility, and decoupling of configuration from code, the single-node E2E test adopts a **"YAML-driven + Dispatcher"** architectural structure.
It consists of the following core components:
* **Configuration Parser (`single_node_config.py`)**: Responsible for reading `models/configs/*.yaml` files and parsing them into a strongly-typed `@dataclass` (`SingleNodeConfig`) via `SingleNodeConfigLoader`, while handling regex replacement for environment variables.
* **Service Manager Framework (`test_single_node.py` and `conftest.py`)**: Based on the `service_mode` (`openai` or `epd`), it utilizes context managers to safely start/stop server processes.
* **Test Function Dispatcher (`TEST_HANDLERS` Registry)**: Specific test logic is encapsulated into independent functions and registered in the global `TEST_HANDLERS` dictionary.
* **Performance Benchmarking (`_run_benchmarks`)**: Calls `aisbench` for performance and TTFT testing based on the `benchmarks` parameters in the YAML.
### 1.1 Key Files and Responsibilities
* `tests/e2e/nightly/single_node/models/scripts/single_node_config.py`
* Defines `SingleNodeConfig` and `SingleNodeConfigLoader`
* Loads YAML from `tests/e2e/nightly/single_node/models/configs/<CONFIG_YAML_PATH>`
* Auto-assigns ports when `envs` contains `DEFAULT_PORT` / missing values
* Expands `$VAR` / `${VAR}` placeholders inside commands via `_expand_values`
* `tests/e2e/nightly/single_node/models/scripts/test_single_node.py`
* Declares `configs = SingleNodeConfigLoader.from_yaml_cases()` (loaded at import time)
* `pytest.mark.parametrize("config", configs, ids=[config.name for config in configs])` runs one test per YAML case
* Controls server lifecycle via context managers
* Dispatches `test_content` to functions registered in `TEST_HANDLERS`
* Runs `aisbench` and optional benchmark assertions
### 1.2 End-to-End Flow (High Level)
```txt
pytest starts
|
v
import tests/e2e/nightly/single_node/models/scripts/test_single_node.py
|
v
configs = SingleNodeConfigLoader.from_yaml_cases()
|
v
pytest parametrize("config", configs) # one config == one test case
|
v
test_single_node(config)
|
+-----------------------------------------------+
| Start service (depends on service_mode) |
| |
| openai: start one vLLM OpenAI-compatible |
| service process |
| epd: start (encode service + decode/PD |
| service) + start proxy process |
+-----------------------------------------------+
|
v
Run test phases (test_content)
|
v
Optional benchmarks (if benchmarks is configured)
|
v
Shutdown all started processes
Notes:
- One YAML file may contain multiple test_cases; pytest will run them one by one.
- The framework is "YAML-driven": changes are typically done by editing YAML rather than editing Python code.
```
### 1.3 Function Call Relationships (Dispatcher)
`test_content` is a list of “phases”. Each phase maps to one handler function.
```txt
For each test_case:
test_content (list of phases)
|
v
[Dispatcher]
|
+--> phase "completion" -> send completion request(s)
|
+--> phase "chat_completion" -> send chat completion request(s)
|
+--> phase "image" -> send multimodal image request(s)
|
\--> (extendable) add your own phase by registering a new handler
After phases:
if benchmarks is configured -> run aisbench
Notes:
- The dispatcher only controls "what to run"; service lifecycle is controlled by the service manager.
- Phases are intentionally small & composable so you can reuse them across YAML cases.
```
## 2. Running and Debugging Steps
### 2.1 Dependencies
Ensure you are in an NPU environment and have installed `pytest`, `pyyaml`, `openai`, and `aisbench`.
### 2.2 Local Execution
The framework uses the `CONFIG_YAML_PATH` environment variable to specify the configuration file.
```bash
# Switch to the project root directory
cd /vllm-workspace/vllm-ascend
# Run a specific yaml test
export CONFIG_YAML_PATH="Qwen3-32B.yaml"
pytest -sv tests/e2e/nightly/single_node/models/scripts/test_single_node.py
```
### 2.3 Tips for Debugging
* Only run a subset of cases: `pytest -sv ... -k <keyword>` (matches case names in the report output)
* Stop on first failure: `pytest -sv ... -x`
* Keep server logs visible: use `-s` (already included in `-sv`) and increase log verbosity via standard Python logging configuration if needed.
## 3. How to Write YAML Configuration Files
### 3.1 File Location and Selection Rules
* YAML files live under: `tests/e2e/nightly/single_node/models/configs/`
* Selected by env var: `CONFIG_YAML_PATH=<YourConfig>.yaml`
* If not set, the loader uses `SingleNodeConfigLoader.DEFAULT_CONFIG_NAME`
### 3.2 Field Descriptions
| Field Name | Type | Required | Default Value | Description |
| :--------------- | :--------- | :------- | :-------------- | :------------------------------------------------------------------ |
| `test_cases` | list | **Yes** | - | List of test case objects |
| `name` | string | **Yes** | - | Human-readable case ID shown in pytest output and logs |
| `model` | string | **Yes** | - | Model name or local path |
| `service_mode` | string | No | `openai` | Service mode: `openai` or `epd` (disaggregated) |
| `envs` | map | **Yes** | `{}` | Environment variables for the server process |
| `server_cmd` | list | Cond. | `[]` | vLLM startup arguments (Required for non-EPD) |
| `server_cmd_extra` | list | No | `[]` | Extra vLLM startup arguments appended after `server_cmd` |
| `prompts` | list | No | built-in default | Prompts for completion/chat tests |
| `api_keyword_args` | map | No | built-in default | OpenAI API keyword args (e.g., `max_tokens`, sampling params) |
| `test_content` | list | No | `["completion"]` | Test phases: `completion`, `chat_completion`, `image` etc. |
| `benchmarks` | map | No | `{}` | Configuration for `aisbench` performance verification |
| `epd_server_cmds`| list[list] | Cond. | `[]` | (EPD Only) Command arrays for starting dual Encode/Decode processes |
| `epd_proxy_args` | list | Cond. | `[]` | (EPD Only) Startup arguments for the EPD routing gateway |
**Notes / Behaviors**
* `name` is mandatory and must be a non-empty string.
* It is used directly as pytest case id (e.g., `test_single_node[DeepSeek-R1-0528-W8A8-single]`).
* It is also printed in `[single-node][START]` marker for log navigation.
* `envs` (ports): the config object recognizes these keys: `SERVER_PORT`, `ENCODE_PORT`, `PD_PORT`, `PROXY_PORT`.
* If a port key is missing or set to `DEFAULT_PORT`, it will be automatically filled with an available open port.
* `$SERVER_PORT` / `${SERVER_PORT}` placeholders in commands will be expanded using `envs`.
* `server_cmd` vs `server_cmd_extra`:
* YAML can define `server_cmd_extra` to append additional args after `server_cmd`.
* The loader merges them into a single `server_cmd` list.
* Extra fields:
* Any non-standard fields in a case are stored in `config.extra_config`.
* This is how extension configs are passed through without changing the dataclass.
### 3.3 YAML Examples
#### Single-Case (similar to DeepSeek-R1-W8A8-HBM)
```yaml
test_cases:
- name: "<your-case-name>"
model: "<model-repo-or-local-path>"
# Optional: The default values are as follows
prompts:
- "San Francisco is a"
api_keyword_args:
max_tokens: 10
envs:
SERVER_PORT: "DEFAULT_PORT"
# Add only what you need.
server_cmd:
- "--port"
- "$SERVER_PORT"
# plus your vLLM serve args...
# Optional: omit -> defaults to ["completion"]
test_content:
- "chat_completion"
# Optional: leave empty if you don't run aisbench
benchmarks:
```
#### Multi-Case + Shared Anchors
```yaml
_envs: &envs
SERVER_PORT: "DEFAULT_PORT"
# shared envs...
_server_cmd: &server_cmd
- "--port"
- "$SERVER_PORT"
# shared vLLM serve args...
_benchmarks: &benchmarks
perf:
case_type: performance
dataset_path: vllm-ascend/GSM8K-in3500-bs400
request_conf: vllm_api_stream_chat
dataset_conf: gsm8k/gsm8k_gen_0_shot_cot_str_perf
num_prompts: 400
max_out_len: 1500
batch_size: 1000
baseline: 1
threshold: 0.97
test_cases:
- name: "case-a"
model: "<model>"
envs:
<<: *envs
DYNAMIC_EPLB: "true"
# private envs...
server_cmd: *server_cmd
server_cmd_extra:
- "--enforce-eager"
benchmarks:
- name: "case-b"
model: "<model>"
envs:
<<: *envs
server_cmd: *server_cmd
benchmarks:
<<: *benchmarks_acc
```
#### EPD / Disaggregated Case
```yaml
test_cases:
- name: "<your-epd-case>"
model: "<model>"
service_mode: "epd"
envs:
ENCODE_PORT: "DEFAULT_PORT"
PD_PORT: "DEFAULT_PORT"
PROXY_PORT: "DEFAULT_PORT"
epd_server_cmds:
- ["--port", "$ENCODE_PORT", "--model", "<encode-model>"]
- ["--port", "$PD_PORT", "--model", "<decode-model>"]
epd_proxy_args:
- "--host"
- "127.0.0.1"
- "--port"
- "$PROXY_PORT"
- "--encode-servers-urls"
- "http://localhost:$ENCODE_PORT"
- "--decode-servers-urls"
- "http://localhost:$PD_PORT"
- "--prefill-servers-urls"
- "disable"
test_content:
- "chat_completion"
```
## 4. How to Add Custom Tests (Extension)
### Step 1: Write your test logic in `test_single_node.py`
```python
async def run_video_test(config: SingleNodeConfig, server: 'RemoteOpenAIServer | DisaggEpdProxy') -> None:
client = server.get_async_client()
# Your custom logic here...
```
### Step 2: Register your function in `TEST_HANDLERS`
```python
TEST_HANDLERS = {
"completion": run_completion_test,
"video": run_video_test, # Registered!
}
```
### Step 3: Enable in YAML
```yaml
test_content:
- "completion"
- "video"
```
## 5. Checklist (Before Submitting a New YAML)
* `test_cases` exists and is a list
* Each case contains required fields for its `service_mode`
* Common required: `name`, `model`, `envs`
* `openai`: `server_cmd`
* `epd`: `epd_server_cmds`, `epd_proxy_args`
* Port envs are set to `DEFAULT_PORT` (or to explicit free ports)
* If using `benchmarks`, ensure each benchmark case includes required aisbench fields (e.g., `case_type`, `dataset_path`, `request_conf`, `dataset_conf`, `max_out_len`, `batch_size`)

View File

@@ -0,0 +1,16 @@
#
# Copyright (c) 2025 Huawei Technologies Co., Ltd. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# This file is a part of the vllm-ascend project.
#

View File

@@ -0,0 +1,183 @@
import logging
import os
import re
from dataclasses import dataclass, field
from typing import Any
import yaml
from vllm.utils.network_utils import get_open_port
CONFIG_BASE_PATH = "tests/e2e/nightly/single_node/models/configs"
logger = logging.getLogger(__name__)
# Default prompts and API args fallback
PROMPTS = [
"San Francisco is a",
]
API_KEYWORD_ARGS = {
"max_tokens": 10,
}
@dataclass
class SingleNodeConfig:
name: str
model: str
envs: dict[str, Any] = field(default_factory=dict)
prompts: list[str] = field(default_factory=lambda: PROMPTS)
api_keyword_args: dict[str, Any] = field(default_factory=lambda: API_KEYWORD_ARGS)
benchmarks: dict[str, Any] = field(default_factory=dict)
server_cmd: list[str] = field(default_factory=list)
test_content: list[str] = field(default_factory=lambda: ["completion"])
service_mode: str = "openai"
epd_server_cmds: list[list[str]] = field(default_factory=list)
epd_proxy_args: list[str] = field(default_factory=list)
extra_config: dict[str, Any] = field(default_factory=dict)
def __post_init__(self) -> None:
port_keys = ["SERVER_PORT", "ENCODE_PORT", "PD_PORT", "PROXY_PORT"]
for env_key in port_keys:
if self.envs.get(env_key) in ["DEFAULT_PORT", None]:
self.envs[env_key] = str(get_open_port())
if self.prompts is None:
self.prompts = PROMPTS
if self.api_keyword_args is None:
self.api_keyword_args = API_KEYWORD_ARGS
if self.benchmarks is None:
self.benchmarks = {}
if self.test_content is None:
self.test_content = []
self.server_cmd = self._expand_values(self.server_cmd or [], self.envs)
self.epd_server_cmds = [self._expand_values(cmd, self.envs) for cmd in self.epd_server_cmds]
self.epd_proxy_args = self._expand_values(self.epd_proxy_args or [], self.envs)
for key, value in self.extra_config.items():
setattr(self, key, value)
@staticmethod
def _expand_values(values: list[str], envs: dict[str, Any]) -> list[str]:
"""Interpolate $VAR/${VAR} placeholders with provided env values."""
pattern = re.compile(r"\$(\w+)|\$\{(\w+)\}")
def repl(m: re.Match[str]) -> str:
key = m.group(1) or m.group(2)
return str(envs.get(key, m.group(0)))
return [pattern.sub(repl, str(arg)) for arg in values]
def _get_required_port(self, key: str) -> int:
value = self.envs.get(key)
if value is None:
raise ValueError(f"Missing required port env: {key}")
return int(value)
@property
def server_port(self) -> int:
return self._get_required_port("SERVER_PORT")
@property
def encode_port(self) -> int:
return self._get_required_port("ENCODE_PORT")
@property
def pd_port(self) -> int:
return self._get_required_port("PD_PORT")
@property
def proxy_port(self) -> int:
return self._get_required_port("PROXY_PORT")
class SingleNodeConfigLoader:
"""Load SingleNodeConfig from yaml file."""
DEFAULT_CONFIG_NAME = "Kimi-K2-Thinking.yaml"
STANDARD_CASE_FIELDS = {
"name",
"model",
"envs",
"prompts",
"api_keyword_args",
"benchmarks",
"service_mode",
"server_cmd",
"server_cmd_extra",
"test_content",
"epd_server_cmds",
"epd_proxy_args",
}
@classmethod
def from_yaml_cases(cls, yaml_path: str | None = None) -> list[SingleNodeConfig]:
config = cls._load_yaml(yaml_path)
if "test_cases" not in config:
raise KeyError("test_cases field is required in config yaml")
cases = config.get("test_cases")
if not isinstance(cases, list):
raise TypeError("test_cases must be a list")
cls._validate_para(cases)
return cls._parse_test_cases(cases)
@classmethod
def _load_yaml(cls, yaml_path: str | None) -> dict[str, Any]:
if not yaml_path:
yaml_path = os.getenv("CONFIG_YAML_PATH", cls.DEFAULT_CONFIG_NAME)
full_path = os.path.join(CONFIG_BASE_PATH, yaml_path)
logger.info("Loading config yaml: %s", full_path)
with open(full_path) as f:
return yaml.safe_load(f)
@staticmethod
def _validate_para(cases: list[dict[str, Any]]) -> None:
if not cases:
raise ValueError("test_cases is empty")
for case in cases:
mode = case.get("service_mode", "openai")
required = ["name", "model", "envs"]
if mode == "epd":
required.extend(["epd_server_cmds", "epd_proxy_args"])
else:
required.append("server_cmd")
missing = [k for k in required if k not in case]
if missing:
raise KeyError(f"Missing required config fields: {missing}")
if not isinstance(case["name"], str) or not case["name"].strip():
raise ValueError("test case field 'name' must be a non-empty string")
@classmethod
def _parse_test_cases(cls, cases: list[dict[str, Any]]) -> list[SingleNodeConfig]:
result: list[SingleNodeConfig] = []
for case in cases:
server_cmd = case.get("server_cmd", [])
server_cmd_extra = case.get("server_cmd_extra", [])
full_cmd = list(server_cmd) + list(server_cmd_extra)
extra_case_fields = {key: value for key, value in case.items() if key not in cls.STANDARD_CASE_FIELDS}
# Safe parsing mapping
result.append(
SingleNodeConfig(
name=case["name"],
model=case["model"],
envs=case.get("envs", {}),
server_cmd=full_cmd,
epd_server_cmds=case.get("epd_server_cmds", []),
epd_proxy_args=case.get("epd_proxy_args", []),
benchmarks=case.get("benchmarks", {}),
prompts=case.get("prompts", PROMPTS),
api_keyword_args=case.get("api_keyword_args", API_KEYWORD_ARGS),
test_content=case.get("test_content", ["completion"]),
service_mode=case.get("service_mode", "openai"),
extra_config=extra_case_fields,
)
)
return result

View File

@@ -0,0 +1,165 @@
import logging
from typing import Any
import openai
import pytest
from tests.e2e.conftest import DisaggEpdProxy, RemoteEPDServer, RemoteOpenAIServer
from tests.e2e.nightly.single_node.models.scripts.single_node_config import (
SingleNodeConfig,
SingleNodeConfigLoader,
)
from tools.aisbench import run_aisbench_cases
logger = logging.getLogger(__name__)
configs = SingleNodeConfigLoader.from_yaml_cases()
async def run_completion_test(config: SingleNodeConfig, server: "RemoteOpenAIServer | DisaggEpdProxy") -> None:
client = server.get_async_client()
batch = await client.completions.create(
model=config.model,
prompt=config.prompts,
**config.api_keyword_args,
)
choices: list[openai.types.CompletionChoice] = batch.choices
assert choices[0].text, "empty response"
print(choices)
async def run_image_test(config: SingleNodeConfig, server: "RemoteOpenAIServer | DisaggEpdProxy") -> None:
from tools.send_mm_request import send_image_request
send_image_request(config.model, server)
async def run_chat_completion_test(config: SingleNodeConfig, server: "RemoteOpenAIServer | DisaggEpdProxy") -> None:
from tools.send_request import send_v1_chat_completions
send_v1_chat_completions(
config.prompts[0],
model=config.model,
server=server,
request_args=config.api_keyword_args,
)
def run_benchmark_comparisons(config: SingleNodeConfig, results: Any) -> None:
"""General assertion engine for aisbench outcomes mapped directly from YAML."""
comparisons = config.extra_config.get("benchmark_comparisons_args", [])
if not comparisons:
return
# Valid task keys defined in benchmarks mapping
valid_keys = [k for k, v in config.benchmarks.items() if v]
metrics_cache = {}
for comp in comparisons:
metric = comp.get("metric", "TTFT")
baseline_key = comp.get("baseline")
target_key = comp.get("target")
ratio = comp.get("ratio", 1.0)
op = comp.get("operator", "<")
if not baseline_key or not target_key:
logger.warning("Invalid comparison config: missing baseline or target. %s", comp)
continue
if metric not in metrics_cache:
if metric == "TTFT":
from tools.aisbench import get_TTFT
# map TTFT outputs directly to their corresponding benchmark test case names
metrics_cache[metric] = dict(zip(valid_keys, get_TTFT(results)))
else:
logger.warning("Unsupported metric for comparison: %s", metric)
continue
metric_dict = metrics_cache[metric]
baseline_val = metric_dict.get(baseline_key)
target_val = metric_dict.get(target_key)
if baseline_val is None or target_val is None:
logger.warning("Missing data to compare %s and %s in metrics: %s", baseline_key, target_key, metric_dict)
continue
expected_threshold = baseline_val * ratio
eval_str = f"metric {metric}: {target_key}({target_val}) {op} {baseline_key}({baseline_val}) * {ratio}"
if op == "<":
assert target_val < expected_threshold, f"Assertion Failed: {eval_str} [threshold: {expected_threshold}]"
elif op == ">":
assert target_val > expected_threshold, f"Assertion Failed: {eval_str} [threshold: {expected_threshold}]"
elif op == "<=":
assert target_val <= expected_threshold, f"Assertion Failed: {eval_str} [threshold: {expected_threshold}]"
elif op == ">=":
assert target_val >= expected_threshold, f"Assertion Failed: {eval_str} [threshold: {expected_threshold}]"
else:
logger.warning("Unsupported comparison operator: %s", op)
continue
print(f"✅ Comparison passed: {eval_str} [threshold: {expected_threshold}]")
# Extend this dictionary to add new test capabilities
TEST_HANDLERS = {
"completion": run_completion_test,
"image": run_image_test,
"chat_completion": run_chat_completion_test,
}
async def _dispatch_tests(config: SingleNodeConfig, server: "RemoteOpenAIServer | DisaggEpdProxy") -> None:
"""Dispatches requested tests defined in yaml."""
for test_name in config.test_content:
if test_name == "benchmark_comparisons":
continue
handler = TEST_HANDLERS.get(test_name)
if handler:
await handler(config, server)
else:
logger.warning("No handler registered for test content type: %s", test_name)
def _run_benchmarks(config: SingleNodeConfig, port: int) -> None:
"""Run Aisbench benchmarks and process benchmark-dependent custom assertions."""
aisbench_cases = [v for v in config.benchmarks.values() if v]
if not aisbench_cases:
return
result = run_aisbench_cases(
model=config.model,
port=port,
aisbench_cases=aisbench_cases,
)
if "benchmark_comparisons" in config.test_content:
run_benchmark_comparisons(config, result)
@pytest.mark.asyncio
@pytest.mark.parametrize("config", configs, ids=[config.name for config in configs])
async def test_single_node(config: SingleNodeConfig) -> None:
if config.service_mode == "epd":
with (
RemoteEPDServer(vllm_serve_args=config.epd_server_cmds, env_dict=config.envs) as _,
DisaggEpdProxy(proxy_args=config.epd_proxy_args, env_dict=config.envs) as proxy,
):
await _dispatch_tests(config, proxy)
_run_benchmarks(config, proxy.port)
return
# Standard OpenAI service mode
with RemoteOpenAIServer(
model=config.model,
vllm_serve_args=config.server_cmd,
server_port=config.server_port,
env_dict=config.envs,
auto_port=False,
) as server:
await _dispatch_tests(config, server)
_run_benchmarks(config, config.server_port)