Files
sglang/sgl-router/py_test/e2e/test_regular_router.py

229 lines
7.0 KiB
Python

import threading
import time
from types import SimpleNamespace
import pytest
import requests
from sglang.test.run_eval import run_eval
def _wait_for_workers(
base_url: str, expected_count: int, timeout: float = 60.0, headers: dict = None
) -> None:
"""Poll /workers endpoint until expected number of workers are registered."""
start = time.perf_counter()
with requests.Session() as session:
while time.perf_counter() - start < timeout:
try:
r = session.get(f"{base_url}/workers", headers=headers, timeout=5)
if r.status_code == 200:
workers = r.json().get("workers", [])
if len(workers) >= expected_count:
return
except requests.RequestException:
pass
time.sleep(0.5)
raise TimeoutError(
f"Expected {expected_count} workers at {base_url}, timed out after {timeout}s"
)
@pytest.mark.e2e
def test_mmlu(e2e_router_only_rr, e2e_two_workers_dp2, e2e_model):
# Attach two dp=2 workers (total 4 GPUs) to a fresh router-only instance
base = e2e_router_only_rr.url
for w in e2e_two_workers_dp2:
r = requests.post(f"{base}/workers", json={"url": w.url}, timeout=180)
assert (
r.status_code == 202
), f"Expected 202 ACCEPTED, got {r.status_code}: {r.text}"
# Wait for workers to be registered
_wait_for_workers(base, expected_count=2, timeout=60.0)
args = SimpleNamespace(
base_url=base,
model=e2e_model,
eval_name="mmlu",
num_examples=64,
num_threads=32,
temperature=0.1,
)
metrics = run_eval(args)
assert metrics["score"] >= 0.65
@pytest.mark.e2e
def test_genai_bench(
e2e_router_only_rr, e2e_two_workers_dp2, e2e_model, genai_bench_runner
):
"""Attach a worker to the regular router and run a short genai-bench."""
base = e2e_router_only_rr.url
for w in e2e_two_workers_dp2:
r = requests.post(f"{base}/workers", json={"url": w.url}, timeout=180)
assert (
r.status_code == 202
), f"Expected 202 ACCEPTED, got {r.status_code}: {r.text}"
# Wait for workers to be registered
_wait_for_workers(base, expected_count=2, timeout=60.0)
genai_bench_runner(
router_url=base,
model_path=e2e_model,
experiment_folder="benchmark_round_robin_regular",
thresholds={
"ttft_mean_max": 6,
"e2e_latency_mean_max": 14,
"input_throughput_mean_min": 800, # temp relax from 1000 to 800 for now
"output_throughput_mean_min": 12,
# Enforce GPU utilization p50 >= 99% during the run.
"gpu_util_p50_min": 99,
},
kill_procs=e2e_two_workers_dp2,
)
@pytest.mark.e2e
def test_add_and_remove_worker_live(e2e_router_only_rr, e2e_primary_worker, e2e_model):
base = e2e_router_only_rr.url
worker_url = e2e_primary_worker.url
r = requests.post(f"{base}/workers", json={"url": worker_url}, timeout=180)
assert r.status_code == 202, f"Expected 202 ACCEPTED, got {r.status_code}: {r.text}"
# Wait for worker to be registered
_wait_for_workers(base, expected_count=1, timeout=60.0)
with requests.Session() as s:
for i in range(8):
r = s.post(
f"{base}/v1/completions",
json={
"model": e2e_model,
"prompt": f"x{i}",
"max_tokens": 1,
"stream": False,
},
timeout=120,
)
r.raise_for_status()
# Remove the worker
from urllib.parse import quote
encoded_url = quote(worker_url, safe="")
r = requests.delete(f"{base}/workers/{encoded_url}", timeout=60)
assert r.status_code == 202, f"Expected 202 ACCEPTED, got {r.status_code}: {r.text}"
@pytest.mark.e2e
def test_lazy_fault_tolerance_live(e2e_router_only_rr, e2e_primary_worker, e2e_model):
base = e2e_router_only_rr.url
worker = e2e_primary_worker
r = requests.post(f"{base}/workers", json={"url": worker.url}, timeout=180)
assert r.status_code == 202, f"Expected 202 ACCEPTED, got {r.status_code}: {r.text}"
# Wait for worker to be registered
_wait_for_workers(base, expected_count=1, timeout=60.0)
def killer():
time.sleep(10)
try:
worker.proc.terminate()
except Exception:
pass
t = threading.Thread(target=killer, daemon=True)
t.start()
args = SimpleNamespace(
base_url=base,
model=e2e_model,
eval_name="mmlu",
num_examples=32,
num_threads=16,
temperature=0.0,
)
metrics = run_eval(args)
assert 0.0 <= metrics["score"] <= 1.0
@pytest.mark.e2e
def test_dp_aware_worker_expansion_and_api_key(
e2e_model,
e2e_router_only_rr_dp_aware_api,
e2e_worker_dp2_api,
):
"""
Launch a router-only instance in dp_aware mode and a single worker with dp_size=2
and API key protection. Verify expansion, auth enforcement, and basic eval.
"""
import os
router_url = e2e_router_only_rr_dp_aware_api.url
worker_url = e2e_worker_dp2_api.url
api_key = e2e_router_only_rr_dp_aware_api.api_key
# Attach worker; router should expand to dp_size logical workers
r = requests.post(
f"{router_url}/workers",
json={"url": worker_url, "api_key": api_key},
headers={"Authorization": f"Bearer {api_key}"},
timeout=180,
)
assert r.status_code == 202, f"Expected 202 ACCEPTED, got {r.status_code}: {r.text}"
# Wait for workers to be registered and expanded
_wait_for_workers(
router_url,
expected_count=2,
timeout=60.0,
headers={"Authorization": f"Bearer {api_key}"},
)
# Verify the expanded workers have correct URLs
r = requests.get(
f"{router_url}/workers",
headers={"Authorization": f"Bearer {api_key}"},
timeout=30,
)
r.raise_for_status()
workers = r.json().get("workers", [])
urls = [w["url"] for w in workers]
assert len(urls) == 2
assert set(urls) == {f"{worker_url}@0", f"{worker_url}@1"}
# Verify API key enforcement
# 1) Without Authorization -> Should get 401 Unauthorized
r = requests.post(
f"{router_url}/v1/completions",
json={"model": e2e_model, "prompt": "hi", "max_tokens": 1},
timeout=60,
)
assert r.status_code == 401
# 2) With correct Authorization -> 200
r = requests.post(
f"{router_url}/v1/completions",
json={"model": e2e_model, "prompt": "hi", "max_tokens": 1},
headers={"Authorization": f"Bearer {api_key}"},
timeout=60,
)
assert r.status_code == 200
# Finally, run MMLU eval through the router with auth
os.environ["OPENAI_API_KEY"] = api_key
args = SimpleNamespace(
base_url=router_url,
model=e2e_model,
eval_name="mmlu",
num_examples=64,
num_threads=32,
temperature=0.1,
)
metrics = run_eval(args)
assert metrics["score"] >= 0.65