568 lines
18 KiB
Python
568 lines
18 KiB
Python
"""
|
|
Fixtures for launching OpenAI/XAI router for response API e2e testing.
|
|
|
|
This module provides fixtures for launching SGLang router with OpenAI or XAI backends:
|
|
1. Launch router with --backend openai pointing to OpenAI or XAI API
|
|
2. Configure history backend (memory or oracle)
|
|
|
|
This supports testing the Response API against real cloud providers.
|
|
"""
|
|
|
|
import logging
|
|
import os
|
|
import socket
|
|
import subprocess
|
|
import time
|
|
from typing import Optional
|
|
|
|
import requests
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def wait_for_workers_ready(
|
|
router_url: str,
|
|
expected_workers: int,
|
|
timeout: int = 300,
|
|
api_key: Optional[str] = None,
|
|
) -> None:
|
|
"""
|
|
Wait for router to have all workers connected.
|
|
|
|
Polls the /workers endpoint until the 'total' field matches expected_workers.
|
|
|
|
Example response from /workers endpoint:
|
|
{"workers":[],"total":0,"stats":{"prefill_count":0,"decode_count":0,"regular_count":0}}
|
|
|
|
Args:
|
|
router_url: Base URL of router (e.g., "http://127.0.0.1:30000")
|
|
expected_workers: Number of workers expected to be connected
|
|
timeout: Max seconds to wait
|
|
api_key: Optional API key for authentication
|
|
"""
|
|
start_time = time.time()
|
|
last_error = None
|
|
attempt = 0
|
|
|
|
headers = {}
|
|
if api_key:
|
|
headers["Authorization"] = f"Bearer {api_key}"
|
|
|
|
with requests.Session() as session:
|
|
while time.time() - start_time < timeout:
|
|
attempt += 1
|
|
elapsed = int(time.time() - start_time)
|
|
|
|
# Log progress every 10 seconds
|
|
if elapsed > 0 and elapsed % 10 == 0 and attempt % 10 == 0:
|
|
logger.info(
|
|
f" Still waiting for workers... ({elapsed}/{timeout}s elapsed)"
|
|
)
|
|
|
|
try:
|
|
response = session.get(
|
|
f"{router_url}/workers", headers=headers, timeout=5
|
|
)
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
total_workers = data.get("total", 0)
|
|
|
|
if total_workers == expected_workers:
|
|
logger.info(
|
|
f" All {expected_workers} workers connected after {elapsed}s"
|
|
)
|
|
return
|
|
else:
|
|
last_error = f"Workers: {total_workers}/{expected_workers}"
|
|
else:
|
|
last_error = f"HTTP {response.status_code}"
|
|
except requests.ConnectionError:
|
|
last_error = "Connection refused (router not ready yet)"
|
|
except requests.Timeout:
|
|
last_error = "Timeout"
|
|
except requests.RequestException as e:
|
|
last_error = str(e)
|
|
except (ValueError, KeyError) as e:
|
|
last_error = f"Invalid response: {e}"
|
|
|
|
time.sleep(1)
|
|
|
|
raise TimeoutError(
|
|
f"Router at {router_url} did not get {expected_workers} workers within {timeout}s.\n"
|
|
f"Last status: {last_error}\n"
|
|
f"Hint: Run with SHOW_ROUTER_LOGS=1 to see startup logs"
|
|
)
|
|
|
|
|
|
def find_free_port() -> int:
|
|
"""Find an available port on localhost."""
|
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
|
s.bind(("127.0.0.1", 0))
|
|
return s.getsockname()[1]
|
|
|
|
|
|
def wait_for_router_ready(
|
|
router_url: str,
|
|
timeout: int = 60,
|
|
api_key: Optional[str] = None,
|
|
) -> None:
|
|
"""
|
|
Wait for router to be ready.
|
|
|
|
Polls the /health endpoint until it returns 200.
|
|
|
|
Args:
|
|
router_url: Base URL of router (e.g., "http://127.0.0.1:30000")
|
|
timeout: Max seconds to wait
|
|
api_key: Optional API key for authentication
|
|
"""
|
|
start_time = time.time()
|
|
last_error = None
|
|
attempt = 0
|
|
|
|
headers = {}
|
|
if api_key:
|
|
headers["Authorization"] = f"Bearer {api_key}"
|
|
|
|
with requests.Session() as session:
|
|
while time.time() - start_time < timeout:
|
|
attempt += 1
|
|
elapsed = int(time.time() - start_time)
|
|
|
|
# Log progress every 10 seconds
|
|
if elapsed > 0 and elapsed % 10 == 0 and attempt % 10 == 0:
|
|
logger.info(
|
|
f" Still waiting for router... ({elapsed}/{timeout}s elapsed)"
|
|
)
|
|
|
|
try:
|
|
response = session.get(
|
|
f"{router_url}/health", headers=headers, timeout=5
|
|
)
|
|
if response.status_code == 200:
|
|
logger.info(f" Router ready after {elapsed}s")
|
|
return
|
|
else:
|
|
last_error = f"HTTP {response.status_code}"
|
|
except requests.ConnectionError:
|
|
last_error = "Connection refused (router not ready yet)"
|
|
except requests.Timeout:
|
|
last_error = "Timeout"
|
|
except requests.RequestException as e:
|
|
last_error = str(e)
|
|
|
|
time.sleep(1)
|
|
|
|
raise TimeoutError(
|
|
f"Router at {router_url} did not become ready within {timeout}s.\n"
|
|
f"Last status: {last_error}\n"
|
|
f"Hint: Run with SHOW_ROUTER_LOGS=1 to see startup logs"
|
|
)
|
|
|
|
|
|
def popen_launch_openai_xai_router(
|
|
backend: str, # "openai" or "xai"
|
|
base_url: str,
|
|
timeout: int = 60,
|
|
history_backend: str = "memory",
|
|
api_key: Optional[str] = None,
|
|
router_args: Optional[list] = None,
|
|
stdout=None,
|
|
stderr=None,
|
|
prometheus_port: Optional[int] = None,
|
|
) -> dict:
|
|
"""
|
|
Launch SGLang router with OpenAI or XAI backend.
|
|
|
|
This approach:
|
|
1. Starts router with --backend openai
|
|
2. Points to OpenAI or XAI API via --worker-urls
|
|
3. Configures history backend (memory or oracle)
|
|
4. Waits for router health check to pass
|
|
|
|
Args:
|
|
backend: "openai" or "xai"
|
|
base_url: Base URL for router (e.g., "http://127.0.0.1:30000")
|
|
timeout: Timeout for router startup (default: 60s)
|
|
history_backend: "memory" or "oracle" (default: memory)
|
|
api_key: Optional API key for router authentication
|
|
router_args: Additional arguments for router
|
|
stdout: Optional file handle for router stdout
|
|
stderr: Optional file handle for router stderr
|
|
|
|
Returns:
|
|
dict with:
|
|
- router: router process object
|
|
- base_url: router URL (HTTP endpoint)
|
|
|
|
Example:
|
|
>>> cluster = popen_launch_openai_xai_router(
|
|
... "openai", "http://127.0.0.1:30000"
|
|
... )
|
|
>>> # Use cluster['base_url'] for HTTP requests
|
|
>>> # Cleanup:
|
|
>>> kill_process_tree(cluster['router'].pid)
|
|
"""
|
|
show_output = os.environ.get("SHOW_ROUTER_LOGS", "0") == "1"
|
|
|
|
# Parse router port from base_url
|
|
if ":" in base_url.split("//")[-1]:
|
|
router_port = int(base_url.split(":")[-1])
|
|
else:
|
|
router_port = find_free_port()
|
|
|
|
logger.info(f"\n{'='*70}")
|
|
logger.info(f"Launching {backend.upper()} router")
|
|
logger.info(f"{'='*70}")
|
|
logger.info(f" Backend: {backend}")
|
|
logger.info(f" Router port: {router_port}")
|
|
logger.info(f" History backend: {history_backend}")
|
|
|
|
# Determine worker URL based on backend
|
|
if backend == "openai":
|
|
worker_url = "https://api.openai.com"
|
|
# Get API key from environment
|
|
backend_api_key = os.environ.get("OPENAI_API_KEY")
|
|
if not backend_api_key:
|
|
raise ValueError(
|
|
"OPENAI_API_KEY environment variable must be set for OpenAI backend"
|
|
)
|
|
elif backend == "xai":
|
|
worker_url = "https://api.x.ai"
|
|
# Get API key from environment
|
|
backend_api_key = os.environ.get("XAI_API_KEY")
|
|
if not backend_api_key:
|
|
raise ValueError(
|
|
"XAI_API_KEY environment variable must be set for XAI backend"
|
|
)
|
|
else:
|
|
raise ValueError(f"Unsupported backend: {backend}")
|
|
|
|
logger.info(f" Worker URL: {worker_url}")
|
|
|
|
# Build router command
|
|
router_cmd = [
|
|
"python3",
|
|
"-m",
|
|
"sglang_router.launch_router",
|
|
"--host",
|
|
"127.0.0.1",
|
|
"--port",
|
|
str(router_port),
|
|
"--backend",
|
|
"openai",
|
|
"--worker-urls",
|
|
worker_url,
|
|
"--history-backend",
|
|
history_backend,
|
|
"--log-level",
|
|
"warn",
|
|
]
|
|
|
|
# Note: Not adding --api-key to router command for local testing
|
|
# The router will not require authentication
|
|
|
|
# Add Prometheus port to avoid conflicts (use unique port or disable)
|
|
if prometheus_port is None:
|
|
# Auto-assign a unique prometheus port based on router port
|
|
prometheus_port = router_port + 1000
|
|
router_cmd.extend(["--prometheus-port", str(prometheus_port)])
|
|
|
|
# Add router-specific args
|
|
if router_args:
|
|
router_cmd.extend(router_args)
|
|
|
|
if show_output:
|
|
logger.info(f" Command: {' '.join(router_cmd)}")
|
|
|
|
# Set up environment with backend API key
|
|
env = os.environ.copy()
|
|
if backend == "openai":
|
|
env["OPENAI_API_KEY"] = backend_api_key
|
|
else:
|
|
env["XAI_API_KEY"] = backend_api_key
|
|
|
|
# Launch router
|
|
if show_output:
|
|
router_proc = subprocess.Popen(
|
|
router_cmd,
|
|
env=env,
|
|
stdout=stdout,
|
|
stderr=stderr,
|
|
)
|
|
else:
|
|
router_proc = subprocess.Popen(
|
|
router_cmd,
|
|
stdout=stdout if stdout is not None else subprocess.PIPE,
|
|
stderr=stderr if stderr is not None else subprocess.PIPE,
|
|
env=env,
|
|
)
|
|
|
|
print(f" PID: {router_proc.pid}")
|
|
|
|
# Wait for router to be ready
|
|
router_url = f"http://127.0.0.1:{router_port}"
|
|
print(f"\nWaiting for router to start at {router_url}...")
|
|
|
|
try:
|
|
wait_for_router_ready(router_url, timeout=timeout, api_key=None)
|
|
logger.info(f"✓ Router ready at {router_url}")
|
|
except TimeoutError:
|
|
logger.error(f"✗ Router failed to start")
|
|
# Cleanup: kill router
|
|
try:
|
|
router_proc.kill()
|
|
except:
|
|
pass
|
|
raise
|
|
|
|
logger.info(f"\n{'='*70}")
|
|
logger.info(f"✓ {backend.upper()} router ready!")
|
|
logger.info(f" Router: {router_url}")
|
|
logger.info(f"{'='*70}\n")
|
|
|
|
return {
|
|
"router": router_proc,
|
|
"base_url": router_url,
|
|
}
|
|
|
|
|
|
def popen_launch_workers_and_router(
|
|
model: str,
|
|
base_url: str,
|
|
timeout: int = 300,
|
|
num_workers: int = 2,
|
|
policy: str = "round_robin",
|
|
api_key: Optional[str] = None,
|
|
worker_args: Optional[list] = None,
|
|
router_args: Optional[list] = None,
|
|
tp_size: int = 1,
|
|
env: Optional[dict] = None,
|
|
stdout=None,
|
|
stderr=None,
|
|
) -> dict:
|
|
"""
|
|
Launch SGLang workers and gRPC router separately.
|
|
|
|
This approach:
|
|
1. Starts N SGLang workers with --grpc-mode flag
|
|
2. Waits for workers to initialize (process startup)
|
|
3. Starts a gRPC router pointing to those workers
|
|
4. Waits for router health check to pass (router validates worker connectivity)
|
|
|
|
This matches production deployment patterns better than the integrated approach.
|
|
|
|
Args:
|
|
model: Model path (e.g., /home/ubuntu/models/llama-3.1-8b-instruct)
|
|
base_url: Base URL for router (e.g., "http://127.0.0.1:8080")
|
|
timeout: Timeout for server startup (default: 300s)
|
|
num_workers: Number of workers to launch
|
|
policy: Routing policy (round_robin, random, power_of_two, cache_aware)
|
|
api_key: Optional API key for router
|
|
worker_args: Additional arguments for workers (e.g., ["--context-len", "8192"])
|
|
router_args: Additional arguments for router (e.g., ["--max-total-token", "1536"])
|
|
tp_size: Tensor parallelism size for workers (default: 1)
|
|
env: Optional environment variables for workers (e.g., {"SGLANG_CLIP_MAX_NEW_TOKENS_ESTIMATION": "256"})
|
|
stdout: Optional file handle for worker stdout (default: subprocess.PIPE)
|
|
stderr: Optional file handle for worker stderr (default: subprocess.PIPE)
|
|
|
|
Returns:
|
|
dict with:
|
|
- workers: list of worker process objects
|
|
- worker_urls: list of gRPC worker URLs
|
|
- router: router process object
|
|
- base_url: router URL (HTTP endpoint)
|
|
|
|
Example:
|
|
>>> cluster = popen_launch_workers_and_router(model, base_url, num_workers=2)
|
|
>>> # Use cluster['base_url'] for HTTP requests
|
|
>>> # Cleanup:
|
|
>>> for worker in cluster['workers']:
|
|
>>> kill_process_tree(worker.pid)
|
|
>>> kill_process_tree(cluster['router'].pid)
|
|
"""
|
|
show_output = os.environ.get("SHOW_ROUTER_LOGS", "0") == "1"
|
|
|
|
# Parse router port from base_url
|
|
if ":" in base_url.split("//")[-1]:
|
|
router_port = int(base_url.split(":")[-1])
|
|
else:
|
|
router_port = find_free_port()
|
|
|
|
logger.info(f"\n{'='*70}")
|
|
logger.info(f"Launching gRPC cluster (separate workers + router)")
|
|
logger.info(f"{'='*70}")
|
|
logger.info(f" Model: {model}")
|
|
logger.info(f" Router port: {router_port}")
|
|
logger.info(f" Workers: {num_workers}")
|
|
logger.info(f" TP size: {tp_size}")
|
|
logger.info(f" Policy: {policy}")
|
|
|
|
# Step 1: Launch workers with gRPC enabled
|
|
workers = []
|
|
worker_urls = []
|
|
|
|
for i in range(num_workers):
|
|
worker_port = find_free_port()
|
|
worker_url = f"grpc://127.0.0.1:{worker_port}"
|
|
worker_urls.append(worker_url)
|
|
|
|
logger.info(f"\n[Worker {i+1}/{num_workers}]")
|
|
logger.info(f" Port: {worker_port}")
|
|
logger.info(f" URL: {worker_url}")
|
|
|
|
# Build worker command
|
|
worker_cmd = [
|
|
"python3",
|
|
"-m",
|
|
"sglang.launch_server",
|
|
"--model-path",
|
|
model,
|
|
"--host",
|
|
"127.0.0.1",
|
|
"--port",
|
|
str(worker_port),
|
|
"--grpc-mode", # Enable gRPC for this worker
|
|
"--mem-fraction-static",
|
|
"0.8",
|
|
"--attention-backend",
|
|
"fa3",
|
|
]
|
|
|
|
# Add TP size
|
|
if tp_size > 1:
|
|
worker_cmd.extend(["--tp-size", str(tp_size)])
|
|
|
|
# Add worker-specific args
|
|
if worker_args:
|
|
worker_cmd.extend(worker_args)
|
|
|
|
# Launch worker with optional environment variables
|
|
if show_output:
|
|
worker_proc = subprocess.Popen(
|
|
worker_cmd,
|
|
env=env,
|
|
stdout=stdout,
|
|
stderr=stderr,
|
|
)
|
|
else:
|
|
worker_proc = subprocess.Popen(
|
|
worker_cmd,
|
|
stdout=stdout if stdout is not None else subprocess.PIPE,
|
|
stderr=stderr if stderr is not None else subprocess.PIPE,
|
|
env=env,
|
|
)
|
|
|
|
workers.append(worker_proc)
|
|
logger.info(f" PID: {worker_proc.pid}")
|
|
|
|
# Give workers a moment to start binding to ports
|
|
# The router will check worker health when it starts
|
|
logger.info(f"\nWaiting for {num_workers} workers to initialize (20s)...")
|
|
time.sleep(20)
|
|
|
|
# Quick check: make sure worker processes are still alive
|
|
for i, worker in enumerate(workers):
|
|
if worker.poll() is not None:
|
|
logger.error(
|
|
f" ✗ Worker {i+1} died during startup (exit code: {worker.poll()})"
|
|
)
|
|
# Cleanup: kill all workers
|
|
for w in workers:
|
|
try:
|
|
w.kill()
|
|
except:
|
|
pass
|
|
raise RuntimeError(f"Worker {i+1} failed to start")
|
|
|
|
logger.info(
|
|
f"✓ All {num_workers} workers started (router will verify connectivity)"
|
|
)
|
|
|
|
# Step 2: Launch router pointing to workers
|
|
logger.info(f"\n[Router]")
|
|
logger.info(f" Port: {router_port}")
|
|
logger.info(f" Worker URLs: {', '.join(worker_urls)}")
|
|
|
|
# Build router command
|
|
router_cmd = [
|
|
"python3",
|
|
"-m",
|
|
"sglang_router.launch_router",
|
|
"--host",
|
|
"127.0.0.1",
|
|
"--port",
|
|
str(router_port),
|
|
"--prometheus-port",
|
|
"9321",
|
|
"--policy",
|
|
policy,
|
|
"--model-path",
|
|
model,
|
|
"--log-level",
|
|
"warn",
|
|
]
|
|
|
|
# Add worker URLs
|
|
router_cmd.append("--worker-urls")
|
|
router_cmd.extend(worker_urls)
|
|
|
|
# Add API key
|
|
if api_key:
|
|
router_cmd.extend(["--api-key", api_key])
|
|
|
|
# Add router-specific args
|
|
if router_args:
|
|
router_cmd.extend(router_args)
|
|
|
|
if show_output:
|
|
logger.info(f" Command: {' '.join(router_cmd)}")
|
|
|
|
# Launch router
|
|
if show_output:
|
|
router_proc = subprocess.Popen(router_cmd)
|
|
else:
|
|
router_proc = subprocess.Popen(
|
|
router_cmd,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
)
|
|
|
|
logger.info(f" PID: {router_proc.pid}")
|
|
|
|
# Wait for router to be ready
|
|
router_url = f"http://127.0.0.1:{router_port}"
|
|
logger.info(f"\nWaiting for router to start at {router_url}...")
|
|
|
|
try:
|
|
wait_for_workers_ready(
|
|
router_url, expected_workers=num_workers, timeout=180, api_key=api_key
|
|
)
|
|
logger.info(f"✓ Router ready at {router_url}")
|
|
except TimeoutError:
|
|
logger.error(f"✗ Router failed to start")
|
|
# Cleanup: kill router and all workers
|
|
try:
|
|
router_proc.kill()
|
|
except:
|
|
pass
|
|
for worker in workers:
|
|
try:
|
|
worker.kill()
|
|
except:
|
|
pass
|
|
raise
|
|
|
|
logger.info(f"\n{'='*70}")
|
|
logger.info(f"✓ gRPC cluster ready!")
|
|
logger.info(f" Router: {router_url}")
|
|
logger.info(f" Workers: {len(workers)}")
|
|
logger.info(f"{'='*70}\n")
|
|
|
|
return {
|
|
"workers": workers,
|
|
"worker_urls": worker_urls,
|
|
"router": router_proc,
|
|
"base_url": router_url,
|
|
}
|