[TEST]Add 2P1D multi node cases for nightly test (#3764)
### What this PR does / why we need it?
This PR adds the 2P1D multi node func/acc/perf test cases, we need test
them daily
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
by running the test
- vLLM version: v0.11.0rc3
- vLLM main:
c9461e05a4
---------
Signed-off-by: jiangyunfan1 <jiangyunfan1@h-partners.com>
Signed-off-by: wangli <wangli858794774@gmail.com>
Co-authored-by: wangli <wangli858794774@gmail.com>
This commit is contained in:
@@ -1,13 +1,98 @@
|
||||
import time
|
||||
from typing import Any, List, Optional, Union
|
||||
|
||||
import httpx
|
||||
import pytest
|
||||
from modelscope import snapshot_download # type: ignore
|
||||
from requests.exceptions import ConnectionError, HTTPError, Timeout
|
||||
|
||||
from tests.e2e.conftest import RemoteOpenAIServer
|
||||
from tests.e2e.nightly.multi_node.config.multi_node_config import (
|
||||
DISAGGREGATED_PREFILL_PROXY_SCRIPT, MultiNodeConfig)
|
||||
from tools.aisbench import run_aisbench_cases
|
||||
|
||||
prompts = [
|
||||
"San Francisco is a",
|
||||
]
|
||||
|
||||
api_keyword_args = {
|
||||
"max_tokens": 10,
|
||||
}
|
||||
|
||||
|
||||
def test_multi_node() -> None:
|
||||
def get_local_model_path_with_retry(
|
||||
model: str,
|
||||
revision: str = "master",
|
||||
max_retries: int = 5,
|
||||
delay: int = 5,
|
||||
) -> Optional[str]:
|
||||
for attempt in range(1, max_retries + 1):
|
||||
try:
|
||||
local_model_path = snapshot_download(
|
||||
model_id=model,
|
||||
revision=revision,
|
||||
)
|
||||
return local_model_path
|
||||
|
||||
except HTTPError:
|
||||
continue
|
||||
|
||||
except (ConnectionError, Timeout):
|
||||
continue
|
||||
|
||||
if attempt < max_retries:
|
||||
time.sleep(delay)
|
||||
return None
|
||||
|
||||
|
||||
async def get_completions(url: str, model: str, prompts: Union[str, List[str]],
|
||||
**api_kwargs: Any) -> List[str]:
|
||||
"""
|
||||
Asynchronously send HTTP requests to a /v1/completions endpoint.
|
||||
|
||||
Args:
|
||||
url: Full endpoint URL, e.g. "http://localhost:1025/v1/completions"
|
||||
model: Model name or local model path
|
||||
prompts: A single prompt string or a list of prompts
|
||||
**api_kwargs: Additional parameters (e.g., max_tokens, temperature)
|
||||
|
||||
Returns:
|
||||
List[str]: A list of generated texts corresponding to each prompt
|
||||
"""
|
||||
headers = {"Content-Type": "application/json"}
|
||||
|
||||
if isinstance(prompts, str):
|
||||
prompts = [prompts]
|
||||
|
||||
results = []
|
||||
async with httpx.AsyncClient(timeout=600.0) as client:
|
||||
for prompt in prompts:
|
||||
payload = {"model": model, "prompt": prompt, **api_kwargs}
|
||||
|
||||
response = await client.post(url, headers=headers, json=payload)
|
||||
if response.status_code != 200:
|
||||
raise RuntimeError(
|
||||
f"Request failed with status {response.status_code}: {response.text}"
|
||||
)
|
||||
|
||||
resp_json = response.json()
|
||||
choices = resp_json.get("choices", [])
|
||||
if not choices or not choices[0].get("text"):
|
||||
raise ValueError("Empty response from server")
|
||||
|
||||
results.append(choices[0]["text"])
|
||||
|
||||
return results
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_multi_node() -> None:
|
||||
config = MultiNodeConfig.from_yaml()
|
||||
local_model_path = get_local_model_path_with_retry(config.model)
|
||||
assert local_model_path is not None, "can not find any local weight for test"
|
||||
env_dict = config.envs
|
||||
# perf_cmd = config.perf_cmd
|
||||
# acc_cmd = config.acc_cmd
|
||||
perf_cmd = config.perf_cmd
|
||||
acc_cmd = config.acc_cmd
|
||||
nodes_info = config.nodes_info
|
||||
disaggregated_prefill = config.disaggregated_prefill
|
||||
server_port = config.server_port
|
||||
@@ -15,7 +100,7 @@ def test_multi_node() -> None:
|
||||
server_host = config.cluster_ips[0]
|
||||
with config.launch_server_proxy(DISAGGREGATED_PREFILL_PROXY_SCRIPT):
|
||||
with RemoteOpenAIServer(
|
||||
model=config.model,
|
||||
model=local_model_path,
|
||||
vllm_serve_args=config.server_cmd,
|
||||
server_port=server_port,
|
||||
server_host=server_host,
|
||||
@@ -26,11 +111,17 @@ def test_multi_node() -> None:
|
||||
nodes_info=nodes_info,
|
||||
max_wait_seconds=2000,
|
||||
) as remote_server:
|
||||
# base_url = remote_server.url_root
|
||||
if config.is_master:
|
||||
pass
|
||||
# TODO: enable perf and acc test
|
||||
# subprocess.run(perf_cmd, check=True)
|
||||
# subprocess.run(acc_cmd, check=True)
|
||||
port = proxy_port if disaggregated_prefill else server_port
|
||||
base_url = f"http://localhost:{port}/v1/completions"
|
||||
_ = await get_completions(url=base_url,
|
||||
model=local_model_path,
|
||||
prompts=prompts,
|
||||
api_kwargs=api_keyword_args)
|
||||
# aisbench test
|
||||
if acc_cmd:
|
||||
run_aisbench_cases(local_model_path, port, acc_cmd)
|
||||
if perf_cmd:
|
||||
run_aisbench_cases(local_model_path, port, perf_cmd)
|
||||
else:
|
||||
remote_server.hang_until_terminated()
|
||||
|
||||
Reference in New Issue
Block a user