From 480d1b8b203ad54712eaf65d7e5cd5e74c8b836a Mon Sep 17 00:00:00 2001 From: Keyang Ru Date: Thu, 11 Sep 2025 12:04:11 -0700 Subject: [PATCH] [router] add benchmark for regular router and pd router (#10280) --- .github/workflows/pr-test-rust.yml | 77 ++++- sgl-router/py_test/e2e/conftest.py | 279 +++++++++++++++++- sgl-router/py_test/e2e/test_pd_router.py | 76 +++-- sgl-router/py_test/e2e/test_regular_router.py | 35 ++- 4 files changed, 434 insertions(+), 33 deletions(-) diff --git a/.github/workflows/pr-test-rust.yml b/.github/workflows/pr-test-rust.yml index 33d98f176..60c7ebdf2 100644 --- a/.github/workflows/pr-test-rust.yml +++ b/.github/workflows/pr-test-rust.yml @@ -65,10 +65,10 @@ jobs: # Run quick benchmarks to ensure they work using Python script python3 scripts/run_benchmarks.py --quick - e2e-python: + pytest-rust: if: github.repository == 'sgl-project/sglang' || github.event_name == 'pull_request' runs-on: BM.A10.4 - timeout-minutes: 35 + timeout-minutes: 25 steps: - name: Checkout code uses: actions/checkout@v4 @@ -109,11 +109,82 @@ jobs: run: | bash scripts/killall_sglang.sh "nuk_gpus" cd sgl-router + python3 -m pip --no-cache-dir install --upgrade --ignore-installed blinker + python3 -m pip --no-cache-dir install --upgrade --break-system-packages genai-bench==0.0.2 pytest -m e2e -s -vv -o log_cli=true --log-cli-level=INFO + - name: Upload benchmark results + if: success() + uses: actions/upload-artifact@v4 + with: + name: genai-bench-results-all-policies + path: sgl-router/benchmark_**/ + finish: - needs: [unit-test-rust, e2e-python] + needs: [unit-test-rust, pytest-rust] runs-on: ubuntu-latest steps: - name: Finish run: echo "This is an empty step to ensure that all jobs are completed." + + summarize-benchmarks: + needs: pytest-rust + runs-on: ubuntu-latest + if: success() + + steps: + - name: Install jq + run: sudo apt-get update && sudo apt-get install -y jq bc + + - name: Download benchmark results + uses: actions/download-artifact@v4 + with: + name: genai-bench-results-all-policies + + - name: List downloaded contents + run: | + echo "Contents after download:" + ls -la + find . -name "benchmark_*" -type d + echo "JSON files found:" + find . -name "*.json" | head -10 + + - name: Create benchmark summary + run: | + echo "=== DEBUG: Creating benchmark summary ===" + echo "Available benchmark directories:" + find . -name "benchmark_*" -type d || true + echo "==========================================" + + echo "## Router E2E Genai-Bench Results Summary" >> $GITHUB_STEP_SUMMARY + echo "" >> $GITHUB_STEP_SUMMARY + echo "Results captured from E2E tests for two scenarios: regular router (2 workers, dp=2) and PD router (2 prefill + 2 decode)." >> $GITHUB_STEP_SUMMARY + echo "" >> $GITHUB_STEP_SUMMARY + echo "| Scenario | Status | TTFT (s) | E2E Latency (s) | Input Throughput (tok/s) | Output Throughput (tok/s) |" >> $GITHUB_STEP_SUMMARY + echo "|----------|--------|----------|-----------------|--------------------------|---------------------------|" >> $GITHUB_STEP_SUMMARY + + scenarios=$'Regular (dp=2, round_robin)|benchmark_round_robin_regular\nPD (2 prefill + 2 decode, round_robin)|benchmark_round_robin_pd' + + echo "$scenarios" | sed 's/^\s*//' | while IFS='|' read -r label pattern; do + [ -z "$label" ] && continue + # Find the result folder (handle different extraction layouts) + result_folder=$(find . -maxdepth 3 \( -name "$pattern" -o -path "*${pattern}*" \) -type d | head -1) + + if [ -n "$result_folder" ] && [ -d "$result_folder" ]; then + json_file=$(find "$result_folder" -name "*.json" -not -name "experiment_metadata.json" | head -1) + + if [ -n "$json_file" ] && [ -f "$json_file" ]; then + ttft_mean=$(jq -r '.aggregated_metrics.stats.ttft.mean' "$json_file") + e2e_latency_mean=$(jq -r '.aggregated_metrics.stats.e2e_latency.mean' "$json_file") + input_throughput_mean=$(jq -r '.aggregated_metrics.stats.input_throughput.mean' "$json_file") + output_throughput_mean=$(jq -r '.aggregated_metrics.stats.output_throughput.mean' "$json_file") + + ttft_display=$(printf "%.2f" "$ttft_mean" 2>/dev/null || echo "$ttft_mean") + e2e_display=$(printf "%.2f" "$e2e_latency_mean" 2>/dev/null || echo "$e2e_latency_mean") + input_display=$(printf "%.0f" "$input_throughput_mean" 2>/dev/null || echo "$input_throughput_mean") + output_display=$(printf "%.0f" "$output_throughput_mean" 2>/dev/null || echo "$output_throughput_mean") + + echo "| ${label} | ✅ Success | $ttft_display | $e2e_display | $input_display | $output_display |" >> $GITHUB_STEP_SUMMARY + fi + fi + done diff --git a/sgl-router/py_test/e2e/conftest.py b/sgl-router/py_test/e2e/conftest.py index 02eea55d4..7987f328c 100644 --- a/sgl-router/py_test/e2e/conftest.py +++ b/sgl-router/py_test/e2e/conftest.py @@ -1,7 +1,14 @@ +import json +import logging +import os +import shutil +import signal import socket import subprocess import time +from pathlib import Path from types import SimpleNamespace +from typing import Callable, Optional from urllib.parse import urlparse import pytest @@ -13,6 +20,8 @@ from sglang.test.test_utils import ( DEFAULT_URL_FOR_TEST, ) +logger = logging.getLogger(__name__) + def _find_available_port() -> int: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: @@ -89,6 +98,7 @@ def _popen_launch_worker( *, dp_size: int | None = None, api_key: str | None = None, + base_gpu_id: int | None = 0, ) -> subprocess.Popen: host, port = _parse_url(base_url) @@ -103,7 +113,7 @@ def _popen_launch_worker( "--port", port, "--base-gpu-id", - "0", + str(base_gpu_id or 0), ] if dp_size is not None: cmd += ["--dp-size", str(dp_size)] @@ -161,6 +171,250 @@ def _terminate(proc: subprocess.Popen, timeout: float = 120) -> None: time.sleep(1) +def _which(cmd: str) -> Optional[str]: + try: + return shutil.which(cmd) + except Exception as e: + logger.warning("shutil.which(%r) failed: %s", cmd, e) + return None + + +def _graceful_stop_popen(p: subprocess.Popen) -> None: + if p is None: + return + try: + if p.poll() is None: + p.terminate() + for _ in range(5): + if p.poll() is not None: + break + time.sleep(1) + if p.poll() is None: + p.kill() + except Exception as e: + logger.warning("Exception during graceful stop of popen: %s", e) + + +def _pid_alive(pid: int) -> bool: + try: + os.kill(pid, 0) + return True + except Exception: + return False + + +def _graceful_stop_pid(pid: int) -> None: + try: + if _pid_alive(pid): + try: + os.kill(pid, signal.SIGTERM) + except Exception: + pass + for _ in range(5): + if not _pid_alive(pid): + break + time.sleep(1) + if _pid_alive(pid): + try: + os.kill(pid, signal.SIGKILL) + except Exception: + pass + except Exception: + pass + + +def _graceful_stop_any(obj) -> None: + try: + if isinstance(obj, subprocess.Popen): + _graceful_stop_popen(obj) + return + if isinstance(obj, int): + _graceful_stop_pid(obj) + return + proc_obj = getattr(obj, "proc", None) + if isinstance(proc_obj, subprocess.Popen): + _graceful_stop_popen(proc_obj) + except Exception: + pass + + +@pytest.fixture(scope="session") +def genai_bench_runner() -> Callable[..., None]: + """Provide a callable to run genai-bench and validate metrics. + + Usage in tests: + def test(..., genai_bench_runner): + genai_bench_runner(router_url=..., model_path=..., experiment_folder=...) + """ + + def _run( + *, + router_url: str, + model_path: str, + experiment_folder: str, + timeout_sec: int | None = None, + thresholds: dict | None = None, + extra_env: dict | None = None, + num_concurrency: int = 32, + traffic_scenario: str = "D(4000,100)", + max_requests_per_run: int | None = None, + clean_experiment: bool = True, + kill_procs: list | None = None, + drain_delay_sec: int = 6, + ) -> None: + cli = _which("genai-bench") + if not cli: + pytest.fail( + "genai-bench CLI not found; please install it to run benchmarks" + ) + + # Clean previous experiment folder under current working directory + if clean_experiment: + exp_dir = Path.cwd() / experiment_folder + if exp_dir.exists(): + shutil.rmtree(exp_dir, ignore_errors=True) + + # Default requests per run if not provided + mrr = ( + max_requests_per_run + if max_requests_per_run is not None + else num_concurrency * 3 + ) + + cmd = [ + cli, + "benchmark", + "--api-backend", + "openai", + "--api-base", + router_url, + "--api-key", + "dummy-token", + "--api-model-name", + model_path, + "--model-tokenizer", + model_path, + "--task", + "text-to-text", + "--num-concurrency", + str(num_concurrency), + "--traffic-scenario", + traffic_scenario, + "--max-requests-per-run", + str(mrr), + "--max-time-per-run", + "2", + "--experiment-folder-name", + experiment_folder, + "--experiment-base-dir", + str(Path.cwd()), + ] + + env = os.environ.copy() + if extra_env: + env.update(extra_env) + + to = timeout_sec or int(os.environ.get("GENAI_BENCH_TEST_TIMEOUT", "120")) + proc = subprocess.Popen( + cmd, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True + ) + stdout = stderr = "" + rc = None + try: + try: + stdout, stderr = proc.communicate(timeout=to) + except subprocess.TimeoutExpired: + # Simple: kill the CLI process if it doesn't exit in time + try: + proc.kill() + except Exception: + pass + stdout, stderr = proc.communicate() + rc = proc.returncode + + # Prefer exact path under cwd; fallback to rglob search + base = Path.cwd() + direct = base / experiment_folder + candidates = [direct] if direct.is_dir() else [] + if not candidates: + for p in base.rglob(experiment_folder): + if p.is_dir() and p.name == experiment_folder: + candidates = [p] + break + if not candidates: + raise AssertionError( + "Benchmark failed: experiment folder not found: " + f"{experiment_folder}\nExit code: {rc}\nSTDOUT (tail):\n{stdout[-1000:]}\nSTDERR (tail):\n{stderr[-1000:]}" + ) + actual_folder = candidates[0] + + json_files = [ + p + for p in actual_folder.rglob("*.json") + if "experiment_metadata" not in p.name + ] + if not json_files: + raise AssertionError( + "Benchmark failed: no JSON results found\n" + f"Exit code: {rc}\nSTDOUT (tail):\n{stdout[-1000:]}\nSTDERR (tail):\n{stderr[-1000:]}" + ) + + th = thresholds # None means "log only", no validation + + for jf in json_files: + with jf.open("r") as f: + data = json.load(f) + stats = data.get("aggregated_metrics", {}).get("stats", {}) + ttft_mean = float(stats.get("ttft", {}).get("mean", float("inf"))) + e2e_latency_mean = float( + stats.get("e2e_latency", {}).get("mean", float("inf")) + ) + input_tp_mean = float(stats.get("input_throughput", {}).get("mean", 0.0)) + output_tp_mean = float(stats.get("output_throughput", {}).get("mean", 0.0)) + + logger.info( + "genai-bench[%s] %s ttft_mean=%.3fs e2e_latency_mean=%.3fs input_tp_mean=%.1f tok/s output_tp_mean=%.1f tok/s", + experiment_folder, + jf.name, + ttft_mean, + e2e_latency_mean, + input_tp_mean, + output_tp_mean, + ) + + if th is not None: + assert ( + ttft_mean <= th["ttft_mean_max"] + ), f"TTFT validation failed: {ttft_mean} > {th['ttft_mean_max']} (file={jf.name})" + assert ( + e2e_latency_mean <= th["e2e_latency_mean_max"] + ), f"E2E latency validation failed: {e2e_latency_mean} > {th['e2e_latency_mean_max']} (file={jf.name})" + assert ( + input_tp_mean >= th["input_throughput_mean_min"] + ), f"Input throughput validation failed: {input_tp_mean} < {th['input_throughput_mean_min']} (file={jf.name})" + assert ( + output_tp_mean >= th["output_throughput_mean_min"] + ), f"Output throughput validation failed: {output_tp_mean} < {th['output_throughput_mean_min']} (file={jf.name})" + + finally: + # Always attempt to stop workers to avoid resource leakage + if kill_procs: + # Give router/workers a small grace period to finish any last drains + if drain_delay_sec > 0: + try: + time.sleep(drain_delay_sec) + except Exception: + pass + for p in kill_procs: + _graceful_stop_any(p) + try: + time.sleep(2) + except Exception: + pass + + return _run + + def pytest_configure(config): config.addinivalue_line("markers", "e2e: mark as end-to-end test") @@ -233,3 +487,26 @@ def e2e_worker_dp2_api(e2e_model: str, e2e_router_only_rr_dp_aware_api): yield SimpleNamespace(proc=proc, url=base_url) finally: _terminate(proc) + + +@pytest.fixture(scope="session") +def e2e_two_workers_dp2(e2e_model: str): + """Launch two workers, each with dp_size=2, mapped to GPUs [0,1] and [2,3].""" + workers = [] + try: + # Worker A on GPUs 0-1 + port_a = _find_available_port() + url_a = f"http://127.0.0.1:{port_a}" + proc_a = _popen_launch_worker(e2e_model, url_a, dp_size=2, base_gpu_id=0) + workers.append(SimpleNamespace(proc=proc_a, url=url_a)) + + # Worker B on GPUs 2-3 + port_b = _find_available_port() + url_b = f"http://127.0.0.1:{port_b}" + proc_b = _popen_launch_worker(e2e_model, url_b, dp_size=2, base_gpu_id=2) + workers.append(SimpleNamespace(proc=proc_b, url=url_b)) + + yield workers + finally: + for w in workers: + _terminate(w.proc) diff --git a/sgl-router/py_test/e2e/test_pd_router.py b/sgl-router/py_test/e2e/test_pd_router.py index dd6da7482..f6a73cd01 100644 --- a/sgl-router/py_test/e2e/test_pd_router.py +++ b/sgl-router/py_test/e2e/test_pd_router.py @@ -1,3 +1,5 @@ +import logging +import os import socket import subprocess import time @@ -9,6 +11,8 @@ import requests from sglang.test.run_eval import run_eval +logger = logging.getLogger(__name__) + def _find_available_port() -> int: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: @@ -132,11 +136,9 @@ def _terminate(proc: subprocess.Popen, timeout: float = 120) -> None: time.sleep(1) -@pytest.mark.e2e -def test_pd_mmlu(e2e_model: str): - """ - Launch 4 workers, start a PD router (2 prefill + 2 decode), then run MMLU. - """ +@pytest.fixture(scope="module") +def pd_cluster(e2e_model: str): + """Start 2 prefill + 2 decode workers and one PD router, once per module.""" # Environment capability checks: require sgl_kernel and GPU backend try: import sgl_kernel # noqa: F401 @@ -153,8 +155,8 @@ def test_pd_mmlu(e2e_model: str): if not torch.cuda.is_available(): # pragma: no cover - environment dependent pytest.fail("PD e2e requires CUDA backend, but CUDA is not available") - # Start two prefill workers (with bootstrap ports) and two decode workers workers: list[SimpleNamespace] = [] + router_proc = None try: ib_device = _detect_ib_device() @@ -196,14 +198,12 @@ def test_pd_mmlu(e2e_model: str): "--policy", "round_robin", "--pd-disaggregation", - # prefill URLs (explicitly pass 'none' for bootstrap port) ] for url, bport in prefill: cmd += ["--prefill", url, str(bport)] for url in decode: cmd += ["--decode", url] cmd += [ - # prometheus (avoid collisions across tests) "--prometheus-port", str(pport), "--prometheus-host", @@ -211,22 +211,52 @@ def test_pd_mmlu(e2e_model: str): ] router_proc = subprocess.Popen(cmd) - try: - _wait_health(router_url, timeout=180.0) + _wait_health(router_url, timeout=180.0) - # Run a modest MMLU eval through the PD router - args = SimpleNamespace( - base_url=router_url, - model=e2e_model, - eval_name="mmlu", - num_examples=64, - num_threads=32, - temperature=0.1, - ) - metrics = run_eval(args) - assert metrics["score"] >= 0.65 - finally: - _terminate(router_proc) + yield SimpleNamespace( + router_url=router_url, workers=workers, router_proc=router_proc + ) finally: + if router_proc is not None: + _terminate(router_proc) for w in workers: _terminate(w.proc) + + +@pytest.mark.e2e +def test_pd_mmlu(e2e_model: str, pd_cluster): + """ + Launch 4 workers, start a PD router (2 prefill + 2 decode), then run MMLU. + """ + args = SimpleNamespace( + base_url=pd_cluster.router_url, + model=e2e_model, + eval_name="mmlu", + num_examples=64, + num_threads=32, + temperature=0.1, + ) + metrics = run_eval(args) + assert metrics["score"] >= 0.65 + + +@pytest.mark.e2e +def test_pd_genai_bench(e2e_model: str, pd_cluster, genai_bench_runner): + """ + Launch 4 workers, start a PD router (2 prefill + 2 decode), then run a + short genai-bench benchmark and validate aggregate metrics. + """ + # Run genai-bench against the shared router + policy_label = "benchmark_round_robin_pd" + genai_bench_runner( + router_url=pd_cluster.router_url, + model_path=e2e_model, + experiment_folder=policy_label, + thresholds={ + "ttft_mean_max": 12, + "e2e_latency_mean_max": 15, + "input_throughput_mean_min": 400, + "output_throughput_mean_min": 20, + }, + kill_procs=pd_cluster.workers, + ) diff --git a/sgl-router/py_test/e2e/test_regular_router.py b/sgl-router/py_test/e2e/test_regular_router.py index b40c43408..856ecda72 100644 --- a/sgl-router/py_test/e2e/test_regular_router.py +++ b/sgl-router/py_test/e2e/test_regular_router.py @@ -9,13 +9,12 @@ from sglang.test.run_eval import run_eval @pytest.mark.e2e -def test_mmlu(e2e_router_only_rr, e2e_primary_worker, e2e_model): - # Attach the primary worker to a fresh router-only instance (single model) +def test_mmlu(e2e_router_only_rr, e2e_two_workers_dp2, e2e_model): + # Attach two dp=2 workers (total 4 GPUs) to a fresh router-only instance base = e2e_router_only_rr.url - r = requests.post( - f"{base}/add_worker", params={"url": e2e_primary_worker.url}, timeout=180 - ) - r.raise_for_status() + for w in e2e_two_workers_dp2: + r = requests.post(f"{base}/add_worker", params={"url": w.url}, timeout=180) + r.raise_for_status() args = SimpleNamespace( base_url=base, @@ -29,6 +28,30 @@ def test_mmlu(e2e_router_only_rr, e2e_primary_worker, e2e_model): assert metrics["score"] >= 0.65 +@pytest.mark.e2e +def test_genai_bench( + e2e_router_only_rr, e2e_two_workers_dp2, e2e_model, genai_bench_runner +): + """Attach a worker to the regular router and run a short genai-bench.""" + base = e2e_router_only_rr.url + for w in e2e_two_workers_dp2: + r = requests.post(f"{base}/add_worker", params={"url": w.url}, timeout=180) + r.raise_for_status() + + genai_bench_runner( + router_url=base, + model_path=e2e_model, + experiment_folder="benchmark_round_robin_regular", + thresholds={ + "ttft_mean_max": 6, + "e2e_latency_mean_max": 14, + "input_throughput_mean_min": 1000, + "output_throughput_mean_min": 12, + }, + kill_procs=e2e_two_workers_dp2, + ) + + @pytest.mark.e2e def test_add_and_remove_worker_live(e2e_router_only_rr, e2e_primary_worker, e2e_model): base = e2e_router_only_rr.url