Files
sglang/sgl-router/py_test/e2e_response_api/router_fixtures.py

568 lines
18 KiB
Python

"""
Fixtures for launching OpenAI/XAI router for response API e2e testing.
This module provides fixtures for launching SGLang router with OpenAI or XAI backends:
1. Launch router with --backend openai pointing to OpenAI or XAI API
2. Configure history backend (memory or oracle)
This supports testing the Response API against real cloud providers.
"""
import logging
import os
import socket
import subprocess
import time
from typing import Optional
import requests
logger = logging.getLogger(__name__)
def wait_for_workers_ready(
router_url: str,
expected_workers: int,
timeout: int = 300,
api_key: Optional[str] = None,
) -> None:
"""
Wait for router to have all workers connected.
Polls the /workers endpoint until the 'total' field matches expected_workers.
Example response from /workers endpoint:
{"workers":[],"total":0,"stats":{"prefill_count":0,"decode_count":0,"regular_count":0}}
Args:
router_url: Base URL of router (e.g., "http://127.0.0.1:30000")
expected_workers: Number of workers expected to be connected
timeout: Max seconds to wait
api_key: Optional API key for authentication
"""
start_time = time.time()
last_error = None
attempt = 0
headers = {}
if api_key:
headers["Authorization"] = f"Bearer {api_key}"
with requests.Session() as session:
while time.time() - start_time < timeout:
attempt += 1
elapsed = int(time.time() - start_time)
# Log progress every 10 seconds
if elapsed > 0 and elapsed % 10 == 0 and attempt % 10 == 0:
logger.info(
f" Still waiting for workers... ({elapsed}/{timeout}s elapsed)"
)
try:
response = session.get(
f"{router_url}/workers", headers=headers, timeout=5
)
if response.status_code == 200:
data = response.json()
total_workers = data.get("total", 0)
if total_workers == expected_workers:
logger.info(
f" All {expected_workers} workers connected after {elapsed}s"
)
return
else:
last_error = f"Workers: {total_workers}/{expected_workers}"
else:
last_error = f"HTTP {response.status_code}"
except requests.ConnectionError:
last_error = "Connection refused (router not ready yet)"
except requests.Timeout:
last_error = "Timeout"
except requests.RequestException as e:
last_error = str(e)
except (ValueError, KeyError) as e:
last_error = f"Invalid response: {e}"
time.sleep(1)
raise TimeoutError(
f"Router at {router_url} did not get {expected_workers} workers within {timeout}s.\n"
f"Last status: {last_error}\n"
f"Hint: Run with SHOW_ROUTER_LOGS=1 to see startup logs"
)
def find_free_port() -> int:
"""Find an available port on localhost."""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(("127.0.0.1", 0))
return s.getsockname()[1]
def wait_for_router_ready(
router_url: str,
timeout: int = 60,
api_key: Optional[str] = None,
) -> None:
"""
Wait for router to be ready.
Polls the /health endpoint until it returns 200.
Args:
router_url: Base URL of router (e.g., "http://127.0.0.1:30000")
timeout: Max seconds to wait
api_key: Optional API key for authentication
"""
start_time = time.time()
last_error = None
attempt = 0
headers = {}
if api_key:
headers["Authorization"] = f"Bearer {api_key}"
with requests.Session() as session:
while time.time() - start_time < timeout:
attempt += 1
elapsed = int(time.time() - start_time)
# Log progress every 10 seconds
if elapsed > 0 and elapsed % 10 == 0 and attempt % 10 == 0:
logger.info(
f" Still waiting for router... ({elapsed}/{timeout}s elapsed)"
)
try:
response = session.get(
f"{router_url}/health", headers=headers, timeout=5
)
if response.status_code == 200:
logger.info(f" Router ready after {elapsed}s")
return
else:
last_error = f"HTTP {response.status_code}"
except requests.ConnectionError:
last_error = "Connection refused (router not ready yet)"
except requests.Timeout:
last_error = "Timeout"
except requests.RequestException as e:
last_error = str(e)
time.sleep(1)
raise TimeoutError(
f"Router at {router_url} did not become ready within {timeout}s.\n"
f"Last status: {last_error}\n"
f"Hint: Run with SHOW_ROUTER_LOGS=1 to see startup logs"
)
def popen_launch_openai_xai_router(
backend: str, # "openai" or "xai"
base_url: str,
timeout: int = 60,
history_backend: str = "memory",
api_key: Optional[str] = None,
router_args: Optional[list] = None,
stdout=None,
stderr=None,
prometheus_port: Optional[int] = None,
) -> dict:
"""
Launch SGLang router with OpenAI or XAI backend.
This approach:
1. Starts router with --backend openai
2. Points to OpenAI or XAI API via --worker-urls
3. Configures history backend (memory or oracle)
4. Waits for router health check to pass
Args:
backend: "openai" or "xai"
base_url: Base URL for router (e.g., "http://127.0.0.1:30000")
timeout: Timeout for router startup (default: 60s)
history_backend: "memory" or "oracle" (default: memory)
api_key: Optional API key for router authentication
router_args: Additional arguments for router
stdout: Optional file handle for router stdout
stderr: Optional file handle for router stderr
Returns:
dict with:
- router: router process object
- base_url: router URL (HTTP endpoint)
Example:
>>> cluster = popen_launch_openai_xai_router(
... "openai", "http://127.0.0.1:30000"
... )
>>> # Use cluster['base_url'] for HTTP requests
>>> # Cleanup:
>>> kill_process_tree(cluster['router'].pid)
"""
show_output = os.environ.get("SHOW_ROUTER_LOGS", "0") == "1"
# Parse router port from base_url
if ":" in base_url.split("//")[-1]:
router_port = int(base_url.split(":")[-1])
else:
router_port = find_free_port()
logger.info(f"\n{'='*70}")
logger.info(f"Launching {backend.upper()} router")
logger.info(f"{'='*70}")
logger.info(f" Backend: {backend}")
logger.info(f" Router port: {router_port}")
logger.info(f" History backend: {history_backend}")
# Determine worker URL based on backend
if backend == "openai":
worker_url = "https://api.openai.com"
# Get API key from environment
backend_api_key = os.environ.get("OPENAI_API_KEY")
if not backend_api_key:
raise ValueError(
"OPENAI_API_KEY environment variable must be set for OpenAI backend"
)
elif backend == "xai":
worker_url = "https://api.x.ai"
# Get API key from environment
backend_api_key = os.environ.get("XAI_API_KEY")
if not backend_api_key:
raise ValueError(
"XAI_API_KEY environment variable must be set for XAI backend"
)
else:
raise ValueError(f"Unsupported backend: {backend}")
logger.info(f" Worker URL: {worker_url}")
# Build router command
router_cmd = [
"python3",
"-m",
"sglang_router.launch_router",
"--host",
"127.0.0.1",
"--port",
str(router_port),
"--backend",
"openai",
"--worker-urls",
worker_url,
"--history-backend",
history_backend,
"--log-level",
"warn",
]
# Note: Not adding --api-key to router command for local testing
# The router will not require authentication
# Add Prometheus port to avoid conflicts (use unique port or disable)
if prometheus_port is None:
# Auto-assign a unique prometheus port based on router port
prometheus_port = router_port + 1000
router_cmd.extend(["--prometheus-port", str(prometheus_port)])
# Add router-specific args
if router_args:
router_cmd.extend(router_args)
if show_output:
logger.info(f" Command: {' '.join(router_cmd)}")
# Set up environment with backend API key
env = os.environ.copy()
if backend == "openai":
env["OPENAI_API_KEY"] = backend_api_key
else:
env["XAI_API_KEY"] = backend_api_key
# Launch router
if show_output:
router_proc = subprocess.Popen(
router_cmd,
env=env,
stdout=stdout,
stderr=stderr,
)
else:
router_proc = subprocess.Popen(
router_cmd,
stdout=stdout if stdout is not None else subprocess.PIPE,
stderr=stderr if stderr is not None else subprocess.PIPE,
env=env,
)
print(f" PID: {router_proc.pid}")
# Wait for router to be ready
router_url = f"http://127.0.0.1:{router_port}"
print(f"\nWaiting for router to start at {router_url}...")
try:
wait_for_router_ready(router_url, timeout=timeout, api_key=None)
logger.info(f"✓ Router ready at {router_url}")
except TimeoutError:
logger.error(f"✗ Router failed to start")
# Cleanup: kill router
try:
router_proc.kill()
except:
pass
raise
logger.info(f"\n{'='*70}")
logger.info(f"{backend.upper()} router ready!")
logger.info(f" Router: {router_url}")
logger.info(f"{'='*70}\n")
return {
"router": router_proc,
"base_url": router_url,
}
def popen_launch_workers_and_router(
model: str,
base_url: str,
timeout: int = 300,
num_workers: int = 2,
policy: str = "round_robin",
api_key: Optional[str] = None,
worker_args: Optional[list] = None,
router_args: Optional[list] = None,
tp_size: int = 1,
env: Optional[dict] = None,
stdout=None,
stderr=None,
) -> dict:
"""
Launch SGLang workers and gRPC router separately.
This approach:
1. Starts N SGLang workers with --grpc-mode flag
2. Waits for workers to initialize (process startup)
3. Starts a gRPC router pointing to those workers
4. Waits for router health check to pass (router validates worker connectivity)
This matches production deployment patterns better than the integrated approach.
Args:
model: Model path (e.g., /home/ubuntu/models/llama-3.1-8b-instruct)
base_url: Base URL for router (e.g., "http://127.0.0.1:8080")
timeout: Timeout for server startup (default: 300s)
num_workers: Number of workers to launch
policy: Routing policy (round_robin, random, power_of_two, cache_aware)
api_key: Optional API key for router
worker_args: Additional arguments for workers (e.g., ["--context-len", "8192"])
router_args: Additional arguments for router (e.g., ["--max-total-token", "1536"])
tp_size: Tensor parallelism size for workers (default: 1)
env: Optional environment variables for workers (e.g., {"SGLANG_CLIP_MAX_NEW_TOKENS_ESTIMATION": "256"})
stdout: Optional file handle for worker stdout (default: subprocess.PIPE)
stderr: Optional file handle for worker stderr (default: subprocess.PIPE)
Returns:
dict with:
- workers: list of worker process objects
- worker_urls: list of gRPC worker URLs
- router: router process object
- base_url: router URL (HTTP endpoint)
Example:
>>> cluster = popen_launch_workers_and_router(model, base_url, num_workers=2)
>>> # Use cluster['base_url'] for HTTP requests
>>> # Cleanup:
>>> for worker in cluster['workers']:
>>> kill_process_tree(worker.pid)
>>> kill_process_tree(cluster['router'].pid)
"""
show_output = os.environ.get("SHOW_ROUTER_LOGS", "0") == "1"
# Parse router port from base_url
if ":" in base_url.split("//")[-1]:
router_port = int(base_url.split(":")[-1])
else:
router_port = find_free_port()
logger.info(f"\n{'='*70}")
logger.info(f"Launching gRPC cluster (separate workers + router)")
logger.info(f"{'='*70}")
logger.info(f" Model: {model}")
logger.info(f" Router port: {router_port}")
logger.info(f" Workers: {num_workers}")
logger.info(f" TP size: {tp_size}")
logger.info(f" Policy: {policy}")
# Step 1: Launch workers with gRPC enabled
workers = []
worker_urls = []
for i in range(num_workers):
worker_port = find_free_port()
worker_url = f"grpc://127.0.0.1:{worker_port}"
worker_urls.append(worker_url)
logger.info(f"\n[Worker {i+1}/{num_workers}]")
logger.info(f" Port: {worker_port}")
logger.info(f" URL: {worker_url}")
# Build worker command
worker_cmd = [
"python3",
"-m",
"sglang.launch_server",
"--model-path",
model,
"--host",
"127.0.0.1",
"--port",
str(worker_port),
"--grpc-mode", # Enable gRPC for this worker
"--mem-fraction-static",
"0.8",
"--attention-backend",
"fa3",
]
# Add TP size
if tp_size > 1:
worker_cmd.extend(["--tp-size", str(tp_size)])
# Add worker-specific args
if worker_args:
worker_cmd.extend(worker_args)
# Launch worker with optional environment variables
if show_output:
worker_proc = subprocess.Popen(
worker_cmd,
env=env,
stdout=stdout,
stderr=stderr,
)
else:
worker_proc = subprocess.Popen(
worker_cmd,
stdout=stdout if stdout is not None else subprocess.PIPE,
stderr=stderr if stderr is not None else subprocess.PIPE,
env=env,
)
workers.append(worker_proc)
logger.info(f" PID: {worker_proc.pid}")
# Give workers a moment to start binding to ports
# The router will check worker health when it starts
logger.info(f"\nWaiting for {num_workers} workers to initialize (20s)...")
time.sleep(20)
# Quick check: make sure worker processes are still alive
for i, worker in enumerate(workers):
if worker.poll() is not None:
logger.error(
f" ✗ Worker {i+1} died during startup (exit code: {worker.poll()})"
)
# Cleanup: kill all workers
for w in workers:
try:
w.kill()
except:
pass
raise RuntimeError(f"Worker {i+1} failed to start")
logger.info(
f"✓ All {num_workers} workers started (router will verify connectivity)"
)
# Step 2: Launch router pointing to workers
logger.info(f"\n[Router]")
logger.info(f" Port: {router_port}")
logger.info(f" Worker URLs: {', '.join(worker_urls)}")
# Build router command
router_cmd = [
"python3",
"-m",
"sglang_router.launch_router",
"--host",
"127.0.0.1",
"--port",
str(router_port),
"--prometheus-port",
"9321",
"--policy",
policy,
"--model-path",
model,
"--log-level",
"warn",
]
# Add worker URLs
router_cmd.append("--worker-urls")
router_cmd.extend(worker_urls)
# Add API key
if api_key:
router_cmd.extend(["--api-key", api_key])
# Add router-specific args
if router_args:
router_cmd.extend(router_args)
if show_output:
logger.info(f" Command: {' '.join(router_cmd)}")
# Launch router
if show_output:
router_proc = subprocess.Popen(router_cmd)
else:
router_proc = subprocess.Popen(
router_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
logger.info(f" PID: {router_proc.pid}")
# Wait for router to be ready
router_url = f"http://127.0.0.1:{router_port}"
logger.info(f"\nWaiting for router to start at {router_url}...")
try:
wait_for_workers_ready(
router_url, expected_workers=num_workers, timeout=180, api_key=api_key
)
logger.info(f"✓ Router ready at {router_url}")
except TimeoutError:
logger.error(f"✗ Router failed to start")
# Cleanup: kill router and all workers
try:
router_proc.kill()
except:
pass
for worker in workers:
try:
worker.kill()
except:
pass
raise
logger.info(f"\n{'='*70}")
logger.info(f"✓ gRPC cluster ready!")
logger.info(f" Router: {router_url}")
logger.info(f" Workers: {len(workers)}")
logger.info(f"{'='*70}\n")
return {
"workers": workers,
"worker_urls": worker_urls,
"router": router_proc,
"base_url": router_url,
}