Files
sglang/sgl-router/py_test/e2e/conftest.py
2025-09-06 16:19:28 -07:00

236 lines
5.9 KiB
Python

import socket
import subprocess
import time
from types import SimpleNamespace
from urllib.parse import urlparse
import pytest
import requests
from sglang.test.test_utils import (
DEFAULT_MODEL_NAME_FOR_TEST,
DEFAULT_TIMEOUT_FOR_SERVER_LAUNCH,
DEFAULT_URL_FOR_TEST,
)
def _find_available_port() -> int:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(("127.0.0.1", 0))
return s.getsockname()[1]
def _parse_url(base_url: str) -> tuple[str, str]:
"""Parse a base URL and return (host, port) as strings.
This is more robust than simple string splitting and supports different schemes
and URL shapes like trailing paths.
"""
parsed = urlparse(base_url)
return parsed.hostname or "127.0.0.1", (
str(parsed.port) if parsed.port is not None else ""
)
def _wait_router_health(base_url: str, timeout: float) -> None:
start = time.perf_counter()
with requests.Session() as session:
while time.perf_counter() - start < timeout:
try:
r = session.get(f"{base_url}/health", timeout=5)
if r.status_code == 200:
return
except requests.RequestException:
pass
time.sleep(2)
raise TimeoutError("Router failed to become healthy in time")
def _popen_launch_router(
model: str,
base_url: str,
dp_size: int,
timeout: float,
policy: str = "cache_aware",
) -> subprocess.Popen:
host, port = _parse_url(base_url)
prom_port = _find_available_port()
cmd = [
"python3",
"-m",
"sglang_router.launch_server",
"--model-path",
model,
"--host",
host,
"--port",
port,
"--dp",
str(dp_size),
"--router-policy",
policy,
"--allow-auto-truncate",
"--router-prometheus-port",
str(prom_port),
"--router-prometheus-host",
"127.0.0.1",
]
proc = subprocess.Popen(cmd)
_wait_router_health(base_url, timeout)
return proc
def _popen_launch_worker(
model: str,
base_url: str,
*,
dp_size: int | None = None,
api_key: str | None = None,
) -> subprocess.Popen:
host, port = _parse_url(base_url)
cmd = [
"python3",
"-m",
"sglang.launch_server",
"--model-path",
model,
"--host",
host,
"--port",
port,
"--base-gpu-id",
"0",
]
if dp_size is not None:
cmd += ["--dp-size", str(dp_size)]
if api_key is not None:
cmd += ["--api-key", api_key]
return subprocess.Popen(cmd)
def _popen_launch_router_only(
base_url: str,
policy: str = "round_robin",
timeout: float = 120.0,
*,
dp_aware: bool = False,
api_key: str | None = None,
) -> subprocess.Popen:
host, port = _parse_url(base_url)
prom_port = _find_available_port()
cmd = [
"python3",
"-m",
"sglang_router.launch_router",
"--host",
host,
"--port",
port,
"--policy",
policy,
]
if dp_aware:
cmd += ["--dp-aware"]
if api_key is not None:
cmd += ["--api-key", api_key]
cmd += [
"--prometheus-port",
str(prom_port),
"--prometheus-host",
"127.0.0.1",
]
proc = subprocess.Popen(cmd)
_wait_router_health(base_url, timeout)
return proc
def _terminate(proc: subprocess.Popen, timeout: float = 120) -> None:
if proc is None:
return
proc.terminate()
start = time.perf_counter()
while proc.poll() is None:
if time.perf_counter() - start > timeout:
proc.kill()
break
time.sleep(1)
def pytest_configure(config):
config.addinivalue_line("markers", "e2e: mark as end-to-end test")
@pytest.fixture(scope="session")
def e2e_model() -> str:
# Always use the default test model
return DEFAULT_MODEL_NAME_FOR_TEST
@pytest.fixture
def e2e_router(e2e_model: str):
# Keep this available but tests below use router-only to avoid GPU contention
base_url = DEFAULT_URL_FOR_TEST
proc = _popen_launch_router(
e2e_model, base_url, dp_size=2, timeout=DEFAULT_TIMEOUT_FOR_SERVER_LAUNCH
)
try:
yield SimpleNamespace(proc=proc, url=base_url)
finally:
_terminate(proc)
@pytest.fixture
def e2e_router_only_rr():
port = _find_available_port()
base_url = f"http://127.0.0.1:{port}"
proc = _popen_launch_router_only(base_url, policy="round_robin")
try:
yield SimpleNamespace(proc=proc, url=base_url)
finally:
_terminate(proc)
@pytest.fixture(scope="session")
def e2e_primary_worker(e2e_model: str):
port = _find_available_port()
base_url = f"http://127.0.0.1:{port}"
proc = _popen_launch_worker(e2e_model, base_url)
# Router health gate will handle worker readiness
try:
yield SimpleNamespace(proc=proc, url=base_url)
finally:
_terminate(proc)
@pytest.fixture
def e2e_router_only_rr_dp_aware_api():
"""Router-only with dp-aware enabled and an API key."""
port = _find_available_port()
base_url = f"http://127.0.0.1:{port}"
api_key = "secret"
proc = _popen_launch_router_only(
base_url, policy="round_robin", timeout=180.0, dp_aware=True, api_key=api_key
)
try:
yield SimpleNamespace(proc=proc, url=base_url, api_key=api_key)
finally:
_terminate(proc)
@pytest.fixture
def e2e_worker_dp2_api(e2e_model: str, e2e_router_only_rr_dp_aware_api):
"""Worker with dp-size=2 and the same API key as the dp-aware router."""
port = _find_available_port()
base_url = f"http://127.0.0.1:{port}"
api_key = e2e_router_only_rr_dp_aware_api.api_key
proc = _popen_launch_worker(e2e_model, base_url, dp_size=2, api_key=api_key)
try:
yield SimpleNamespace(proc=proc, url=base_url)
finally:
_terminate(proc)