229 lines
7.0 KiB
Python
229 lines
7.0 KiB
Python
import threading
|
|
import time
|
|
from types import SimpleNamespace
|
|
|
|
import pytest
|
|
import requests
|
|
|
|
from sglang.test.run_eval import run_eval
|
|
|
|
|
|
def _wait_for_workers(
|
|
base_url: str, expected_count: int, timeout: float = 60.0, headers: dict = None
|
|
) -> None:
|
|
"""Poll /workers endpoint until expected number of workers are registered."""
|
|
start = time.perf_counter()
|
|
with requests.Session() as session:
|
|
while time.perf_counter() - start < timeout:
|
|
try:
|
|
r = session.get(f"{base_url}/workers", headers=headers, timeout=5)
|
|
if r.status_code == 200:
|
|
workers = r.json().get("workers", [])
|
|
if len(workers) >= expected_count:
|
|
return
|
|
except requests.RequestException:
|
|
pass
|
|
time.sleep(0.5)
|
|
raise TimeoutError(
|
|
f"Expected {expected_count} workers at {base_url}, timed out after {timeout}s"
|
|
)
|
|
|
|
|
|
@pytest.mark.e2e
|
|
def test_mmlu(e2e_router_only_rr, e2e_two_workers_dp2, e2e_model):
|
|
# Attach two dp=2 workers (total 4 GPUs) to a fresh router-only instance
|
|
base = e2e_router_only_rr.url
|
|
for w in e2e_two_workers_dp2:
|
|
r = requests.post(f"{base}/workers", json={"url": w.url}, timeout=180)
|
|
assert (
|
|
r.status_code == 202
|
|
), f"Expected 202 ACCEPTED, got {r.status_code}: {r.text}"
|
|
|
|
# Wait for workers to be registered
|
|
_wait_for_workers(base, expected_count=2, timeout=60.0)
|
|
|
|
args = SimpleNamespace(
|
|
base_url=base,
|
|
model=e2e_model,
|
|
eval_name="mmlu",
|
|
num_examples=64,
|
|
num_threads=32,
|
|
temperature=0.1,
|
|
)
|
|
metrics = run_eval(args)
|
|
assert metrics["score"] >= 0.65
|
|
|
|
|
|
@pytest.mark.e2e
|
|
def test_genai_bench(
|
|
e2e_router_only_rr, e2e_two_workers_dp2, e2e_model, genai_bench_runner
|
|
):
|
|
"""Attach a worker to the regular router and run a short genai-bench."""
|
|
base = e2e_router_only_rr.url
|
|
for w in e2e_two_workers_dp2:
|
|
r = requests.post(f"{base}/workers", json={"url": w.url}, timeout=180)
|
|
assert (
|
|
r.status_code == 202
|
|
), f"Expected 202 ACCEPTED, got {r.status_code}: {r.text}"
|
|
|
|
# Wait for workers to be registered
|
|
_wait_for_workers(base, expected_count=2, timeout=60.0)
|
|
|
|
genai_bench_runner(
|
|
router_url=base,
|
|
model_path=e2e_model,
|
|
experiment_folder="benchmark_round_robin_regular",
|
|
thresholds={
|
|
"ttft_mean_max": 6,
|
|
"e2e_latency_mean_max": 14,
|
|
"input_throughput_mean_min": 800, # temp relax from 1000 to 800 for now
|
|
"output_throughput_mean_min": 12,
|
|
# Enforce GPU utilization p50 >= 99% during the run.
|
|
"gpu_util_p50_min": 99,
|
|
},
|
|
kill_procs=e2e_two_workers_dp2,
|
|
)
|
|
|
|
|
|
@pytest.mark.e2e
|
|
def test_add_and_remove_worker_live(e2e_router_only_rr, e2e_primary_worker, e2e_model):
|
|
base = e2e_router_only_rr.url
|
|
worker_url = e2e_primary_worker.url
|
|
|
|
r = requests.post(f"{base}/workers", json={"url": worker_url}, timeout=180)
|
|
assert r.status_code == 202, f"Expected 202 ACCEPTED, got {r.status_code}: {r.text}"
|
|
|
|
# Wait for worker to be registered
|
|
_wait_for_workers(base, expected_count=1, timeout=60.0)
|
|
|
|
with requests.Session() as s:
|
|
for i in range(8):
|
|
r = s.post(
|
|
f"{base}/v1/completions",
|
|
json={
|
|
"model": e2e_model,
|
|
"prompt": f"x{i}",
|
|
"max_tokens": 1,
|
|
"stream": False,
|
|
},
|
|
timeout=120,
|
|
)
|
|
r.raise_for_status()
|
|
|
|
# Remove the worker
|
|
from urllib.parse import quote
|
|
|
|
encoded_url = quote(worker_url, safe="")
|
|
r = requests.delete(f"{base}/workers/{encoded_url}", timeout=60)
|
|
assert r.status_code == 202, f"Expected 202 ACCEPTED, got {r.status_code}: {r.text}"
|
|
|
|
|
|
@pytest.mark.e2e
|
|
def test_lazy_fault_tolerance_live(e2e_router_only_rr, e2e_primary_worker, e2e_model):
|
|
base = e2e_router_only_rr.url
|
|
worker = e2e_primary_worker
|
|
|
|
r = requests.post(f"{base}/workers", json={"url": worker.url}, timeout=180)
|
|
assert r.status_code == 202, f"Expected 202 ACCEPTED, got {r.status_code}: {r.text}"
|
|
|
|
# Wait for worker to be registered
|
|
_wait_for_workers(base, expected_count=1, timeout=60.0)
|
|
|
|
def killer():
|
|
time.sleep(10)
|
|
try:
|
|
worker.proc.terminate()
|
|
except Exception:
|
|
pass
|
|
|
|
t = threading.Thread(target=killer, daemon=True)
|
|
t.start()
|
|
|
|
args = SimpleNamespace(
|
|
base_url=base,
|
|
model=e2e_model,
|
|
eval_name="mmlu",
|
|
num_examples=32,
|
|
num_threads=16,
|
|
temperature=0.0,
|
|
)
|
|
metrics = run_eval(args)
|
|
assert 0.0 <= metrics["score"] <= 1.0
|
|
|
|
|
|
@pytest.mark.e2e
|
|
def test_dp_aware_worker_expansion_and_api_key(
|
|
e2e_model,
|
|
e2e_router_only_rr_dp_aware_api,
|
|
e2e_worker_dp2_api,
|
|
):
|
|
"""
|
|
Launch a router-only instance in dp_aware mode and a single worker with dp_size=2
|
|
and API key protection. Verify expansion, auth enforcement, and basic eval.
|
|
"""
|
|
import os
|
|
|
|
router_url = e2e_router_only_rr_dp_aware_api.url
|
|
worker_url = e2e_worker_dp2_api.url
|
|
api_key = e2e_router_only_rr_dp_aware_api.api_key
|
|
|
|
# Attach worker; router should expand to dp_size logical workers
|
|
r = requests.post(
|
|
f"{router_url}/workers",
|
|
json={"url": worker_url, "api_key": api_key},
|
|
headers={"Authorization": f"Bearer {api_key}"},
|
|
timeout=180,
|
|
)
|
|
assert r.status_code == 202, f"Expected 202 ACCEPTED, got {r.status_code}: {r.text}"
|
|
|
|
# Wait for workers to be registered and expanded
|
|
_wait_for_workers(
|
|
router_url,
|
|
expected_count=2,
|
|
timeout=60.0,
|
|
headers={"Authorization": f"Bearer {api_key}"},
|
|
)
|
|
|
|
# Verify the expanded workers have correct URLs
|
|
r = requests.get(
|
|
f"{router_url}/workers",
|
|
headers={"Authorization": f"Bearer {api_key}"},
|
|
timeout=30,
|
|
)
|
|
r.raise_for_status()
|
|
workers = r.json().get("workers", [])
|
|
urls = [w["url"] for w in workers]
|
|
assert len(urls) == 2
|
|
assert set(urls) == {f"{worker_url}@0", f"{worker_url}@1"}
|
|
|
|
# Verify API key enforcement
|
|
# 1) Without Authorization -> Should get 401 Unauthorized
|
|
r = requests.post(
|
|
f"{router_url}/v1/completions",
|
|
json={"model": e2e_model, "prompt": "hi", "max_tokens": 1},
|
|
timeout=60,
|
|
)
|
|
assert r.status_code == 401
|
|
|
|
# 2) With correct Authorization -> 200
|
|
r = requests.post(
|
|
f"{router_url}/v1/completions",
|
|
json={"model": e2e_model, "prompt": "hi", "max_tokens": 1},
|
|
headers={"Authorization": f"Bearer {api_key}"},
|
|
timeout=60,
|
|
)
|
|
assert r.status_code == 200
|
|
|
|
# Finally, run MMLU eval through the router with auth
|
|
os.environ["OPENAI_API_KEY"] = api_key
|
|
args = SimpleNamespace(
|
|
base_url=router_url,
|
|
model=e2e_model,
|
|
eval_name="mmlu",
|
|
num_examples=64,
|
|
num_threads=32,
|
|
temperature=0.1,
|
|
)
|
|
metrics = run_eval(args)
|
|
assert metrics["score"] >= 0.65
|