From 7b141f816c0802f9d05ca2b16231fca8c17aadfd Mon Sep 17 00:00:00 2001 From: Keyang Ru Date: Thu, 11 Sep 2025 19:26:02 -0700 Subject: [PATCH] [router][ci] Add gpu utilization analyze with nvml (#10345) --- .github/workflows/pr-test-rust.yml | 28 ++ sgl-router/py_test/e2e/conftest.py | 333 ++++++++++++++++-- sgl-router/py_test/e2e/test_pd_router.py | 1 + sgl-router/py_test/e2e/test_regular_router.py | 2 + 4 files changed, 329 insertions(+), 35 deletions(-) diff --git a/.github/workflows/pr-test-rust.yml b/.github/workflows/pr-test-rust.yml index 60c7ebdf2..1f737c794 100644 --- a/.github/workflows/pr-test-rust.yml +++ b/.github/workflows/pr-test-rust.yml @@ -185,6 +185,34 @@ jobs: output_display=$(printf "%.0f" "$output_throughput_mean" 2>/dev/null || echo "$output_throughput_mean") echo "| ${label} | ✅ Success | $ttft_display | $e2e_display | $input_display | $output_display |" >> $GITHUB_STEP_SUMMARY + + # Optional GPU utilization table if monitor output exists + gpu_json="$result_folder/gpu_utilization.json" + if [ -f "$gpu_json" ]; then + overall_mean=$(jq -r '.overall.mean // 0' "$gpu_json") + printf "\n#### GPU Utilization — %s\n\n" "$label" >> $GITHUB_STEP_SUMMARY + printf "Overall mean: %.2f%%\n\n" "$overall_mean" >> $GITHUB_STEP_SUMMARY + echo "| GPU | Mean (%) | p5 | p10 | p25 | p50 | p75 | p90 | p95 |" >> $GITHUB_STEP_SUMMARY + echo "|-----|----------|----|-----|-----|-----|-----|-----|-----|" >> $GITHUB_STEP_SUMMARY + jq -r ' + .per_gpu + | to_entries[] + | [ .key, + (.value.mean // 0), + (.value.p5 // 0), + (.value.p10 // 0), + (.value.p25 // 0), + (.value.p50 // 0), + (.value.p75 // 0), + (.value.p90 // 0), + (.value.p95 // 0) + ] + | @tsv' "$gpu_json" \ + | while IFS=$'\t' read -r gpu m p5 p10 p25 p50 p75 p90 p95; do + printf "| %s | %.2f | %.2f | %.2f | %.2f | %.2f | %.2f | %.2f | %.2f |\n" "$gpu" "$m" "$p5" "$p10" "$p25" "$p50" "$p75" "$p90" "$p95" >> $GITHUB_STEP_SUMMARY + done + echo "" >> $GITHUB_STEP_SUMMARY + fi fi fi done diff --git a/sgl-router/py_test/e2e/conftest.py b/sgl-router/py_test/e2e/conftest.py index 7987f328c..866b27531 100644 --- a/sgl-router/py_test/e2e/conftest.py +++ b/sgl-router/py_test/e2e/conftest.py @@ -238,6 +238,231 @@ def _graceful_stop_any(obj) -> None: pass +def _gpu_monitor_should_run(thresholds: Optional[dict]) -> bool: + """Decide whether to enable the GPU monitor. + + Runs if thresholds request GPU checks or if GPU_UTIL_LOG is truthy. + """ + want = False + try: + mean_th = None if thresholds is None else thresholds.get("gpu_util_mean_min") + p50_th = None if thresholds is None else thresholds.get("gpu_util_p50_min") + want = bool(mean_th is not None or p50_th is not None) + except Exception: + want = False + if not want: + env_flag = os.environ.get("GPU_UTIL_LOG", "").lower() in ("1", "true", "yes") + want = want or env_flag + return want + + +def _gpu_monitor_path(experiment_folder: str) -> str: + """Return the JSON path for storing GPU monitor results.""" + base = Path.cwd() / experiment_folder + return str(base / "gpu_utilization.json") + + +def _launch_gpu_monitor(bench_pid: int, experiment_folder: str, interval: float): + """Start the GPU monitor process. Returns (proc, path) or (None, None).""" + try: + from multiprocessing import Process + + out_path = _gpu_monitor_path(experiment_folder) + proc = Process( + target=_gpu_monitor_proc_entry, + args=(bench_pid, out_path, interval), + daemon=True, + ) + proc.start() + return proc, out_path + except Exception as e: + logger.warning("Failed to launch GPU monitor: %s", e) + return None, None + + +def _read_gpu_monitor_result(path: Optional[str]) -> Optional[dict]: + try: + if path and os.path.exists(path): + with open(path, "r") as f: + return json.load(f) + except Exception as e: + logger.warning("Failed to read GPU monitor result from %r: %s", path, e) + return None + + +def _log_and_assert_gpu_thresholds( + result: Optional[dict], thresholds: Optional[dict] +) -> None: + if not result or not isinstance(result, dict) or result.get("count", 0) <= 0: + logger.warning("GPU utilization monitor produced no samples.") + return + + overall = result.get("overall", {}) if isinstance(result, dict) else {} + count = int(result.get("count", 0)) + mean_th = None if thresholds is None else thresholds.get("gpu_util_mean_min") + p50_th = None if thresholds is None else thresholds.get("gpu_util_p50_min") + + mean_v = float(overall.get("mean", 0.0)) + p50_v = overall.get("p50") + + logger.info( + "GPU utilization overall: mean=%.2f%% p50=%s (samples=%d)", + mean_v, + (f"{float(p50_v):.2f}%" if p50_v is not None else "n/a"), + count, + ) + + if mean_th is not None: + assert mean_v >= float( + mean_th + ), f"GPU utilization mean below threshold: {mean_v:.2f}% < {mean_th}%" + if p50_th is not None and p50_v is not None: + p50_f = float(p50_v) + assert p50_f >= float( + p50_th + ), f"GPU utilization p50 below threshold: {p50_f:.2f}% < {p50_th}%" + + +def _gpu_monitor_proc_entry(bench_pid: int, out_file: str, interval: float) -> None: + """Low-impact GPU utilization monitor using NVML in a separate process. + + Writes JSON to out_file that includes overall and per-GPU raw samples and summary stats. + """ + try: + try: + os.nice(10) + except Exception: + pass + total = 0.0 + n = 0 + try: + import pynvml # type: ignore + + pynvml.nvmlInit() + except Exception: + with open(out_file, "w") as f: + os.makedirs(os.path.dirname(out_file), exist_ok=True) + json.dump( + { + "count": 0, + "overall": {"mean": 0.0}, + "per_gpu": {}, + "raw": {}, + }, + f, + ) + return + try: + import pynvml # type: ignore + + count = pynvml.nvmlDeviceGetCount() + handles = [pynvml.nvmlDeviceGetHandleByIndex(i) for i in range(count)] + except Exception: + with open(out_file, "w") as f: + os.makedirs(os.path.dirname(out_file), exist_ok=True) + json.dump( + { + "count": 0, + "overall": {"mean": 0.0}, + "per_gpu": {}, + "raw": {}, + }, + f, + ) + return + + # Prepare per-GPU and overall raw collectors + per_gpu_samples: dict[str, list[float]] = {} + overall_samples: list[float] = [] + + while True: + if not os.path.exists(f"/proc/{bench_pid}"): + break + try: + vals = [] + import pynvml # type: ignore + + for idx, h in enumerate(handles): + try: + util = pynvml.nvmlDeviceGetUtilizationRates(h).gpu + vals.append(float(util)) + key = str(idx) + per_gpu_samples.setdefault(key, []).append(float(util)) + except Exception: + continue + if vals: + avg = sum(vals) / len(vals) + overall_samples.append(avg) + total += avg + n += 1 + except Exception: + pass + time.sleep(interval) + finally: + try: + os.makedirs(os.path.dirname(out_file), exist_ok=True) + with open(out_file, "w") as f: + + def pct_from(samples: list[float], p: float) -> float: + if not samples: + return 0.0 + srt = sorted(samples) + i = max( + 0, min(len(srt) - 1, int(round((p / 100.0) * (len(srt) - 1)))) + ) + return float(srt[i]) + + overall_mean = (total / n) if n > 0 else 0.0 + + per_gpu_summary: dict[str, dict] = {} + for key, arr in per_gpu_samples.items(): + per_gpu_summary[key] = { + "mean": float(sum(arr) / len(arr)) if arr else 0.0, + "p5": pct_from(arr, 5), + "p10": pct_from(arr, 10), + "p25": pct_from(arr, 25), + "p50": pct_from(arr, 50), + "p75": pct_from(arr, 75), + "p90": pct_from(arr, 90), + "p95": pct_from(arr, 95), + "min": float(min(arr)) if arr else 0.0, + "max": float(max(arr)) if arr else 0.0, + "count": len(arr), + } + + out_payload = { + "bench_pid": bench_pid, + "interval_sec": interval, + "count": n, + "overall": { + "mean": float(overall_mean), + "p5": pct_from(overall_samples, 5), + "p10": pct_from(overall_samples, 10), + "p25": pct_from(overall_samples, 25), + "p50": pct_from(overall_samples, 50), + "p75": pct_from(overall_samples, 75), + "p90": pct_from(overall_samples, 90), + "p95": pct_from(overall_samples, 95), + "min": float(min(overall_samples)) if overall_samples else 0.0, + "max": float(max(overall_samples)) if overall_samples else 0.0, + }, + "per_gpu": per_gpu_summary, + "raw": { + "overall": overall_samples, + "per_gpu": per_gpu_samples, + }, + } + json.dump(out_payload, f) + except Exception: + pass + try: + import pynvml # type: ignore + + pynvml.nvmlShutdown() + except Exception: + pass + + @pytest.fixture(scope="session") def genai_bench_runner() -> Callable[..., None]: """Provide a callable to run genai-bench and validate metrics. @@ -278,7 +503,7 @@ def genai_bench_runner() -> Callable[..., None]: mrr = ( max_requests_per_run if max_requests_per_run is not None - else num_concurrency * 3 + else num_concurrency * 5 ) cmd = [ @@ -303,7 +528,7 @@ def genai_bench_runner() -> Callable[..., None]: "--max-requests-per-run", str(mrr), "--max-time-per-run", - "2", + "3", "--experiment-folder-name", experiment_folder, "--experiment-base-dir", @@ -318,6 +543,19 @@ def genai_bench_runner() -> Callable[..., None]: proc = subprocess.Popen( cmd, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) + # Optional GPU utilization monitor in a low-priority child process (pynvml only) + # Enabled only when gpu_util_mean_min is provided in thresholds. + monitor_path = None + monitor_proc = None + gpu_util_result: dict | None = None + want_gpu_monitor = _gpu_monitor_should_run(thresholds) + if want_gpu_monitor: + interval = float(os.environ.get("GPU_UTIL_SAMPLE_INTERVAL", "2.0")) + monitor_proc, monitor_path = _launch_gpu_monitor( + bench_pid=proc.pid, + experiment_folder=experiment_folder, + interval=interval, + ) stdout = stderr = "" rc = None try: @@ -348,11 +586,16 @@ def genai_bench_runner() -> Callable[..., None]: ) actual_folder = candidates[0] - json_files = [ - p - for p in actual_folder.rglob("*.json") - if "experiment_metadata" not in p.name - ] + json_files = [] + for _ in range(10): + json_files = [ + p + for p in actual_folder.rglob("*.json") + if "experiment_metadata" not in p.name + ] + if json_files: + break + time.sleep(1) if not json_files: raise AssertionError( "Benchmark failed: no JSON results found\n" @@ -365,36 +608,50 @@ def genai_bench_runner() -> Callable[..., None]: with jf.open("r") as f: data = json.load(f) stats = data.get("aggregated_metrics", {}).get("stats", {}) - ttft_mean = float(stats.get("ttft", {}).get("mean", float("inf"))) - e2e_latency_mean = float( - stats.get("e2e_latency", {}).get("mean", float("inf")) - ) - input_tp_mean = float(stats.get("input_throughput", {}).get("mean", 0.0)) - output_tp_mean = float(stats.get("output_throughput", {}).get("mean", 0.0)) + ttft_mean = float(stats.get("ttft", {}).get("mean", float("inf"))) + e2e_latency_mean = float( + stats.get("e2e_latency", {}).get("mean", float("inf")) + ) + input_tp_mean = float( + stats.get("input_throughput", {}).get("mean", 0.0) + ) + output_tp_mean = float( + stats.get("output_throughput", {}).get("mean", 0.0) + ) - logger.info( - "genai-bench[%s] %s ttft_mean=%.3fs e2e_latency_mean=%.3fs input_tp_mean=%.1f tok/s output_tp_mean=%.1f tok/s", - experiment_folder, - jf.name, - ttft_mean, - e2e_latency_mean, - input_tp_mean, - output_tp_mean, - ) + logger.info( + "genai-bench[%s] %s ttft_mean=%.3fs e2e_latency_mean=%.3fs input_tp_mean=%.1f tok/s output_tp_mean=%.1f tok/s", + experiment_folder, + jf.name, + ttft_mean, + e2e_latency_mean, + input_tp_mean, + output_tp_mean, + ) - if th is not None: - assert ( - ttft_mean <= th["ttft_mean_max"] - ), f"TTFT validation failed: {ttft_mean} > {th['ttft_mean_max']} (file={jf.name})" - assert ( - e2e_latency_mean <= th["e2e_latency_mean_max"] - ), f"E2E latency validation failed: {e2e_latency_mean} > {th['e2e_latency_mean_max']} (file={jf.name})" - assert ( - input_tp_mean >= th["input_throughput_mean_min"] - ), f"Input throughput validation failed: {input_tp_mean} < {th['input_throughput_mean_min']} (file={jf.name})" - assert ( - output_tp_mean >= th["output_throughput_mean_min"] - ), f"Output throughput validation failed: {output_tp_mean} < {th['output_throughput_mean_min']} (file={jf.name})" + if th is not None: + assert ( + ttft_mean <= th["ttft_mean_max"] + ), f"TTFT validation failed: {ttft_mean} > {th['ttft_mean_max']} (file={jf.name})" + assert ( + e2e_latency_mean <= th["e2e_latency_mean_max"] + ), f"E2E latency validation failed: {e2e_latency_mean} > {th['e2e_latency_mean_max']} (file={jf.name})" + assert ( + input_tp_mean >= th["input_throughput_mean_min"] + ), f"Input throughput validation failed: {input_tp_mean} < {th['input_throughput_mean_min']} (file={jf.name})" + assert ( + output_tp_mean >= th["output_throughput_mean_min"] + ), f"Output throughput validation failed: {output_tp_mean} < {th['output_throughput_mean_min']} (file={jf.name})" + + # Validate optional GPU utilization threshold if provided + if want_gpu_monitor: + try: + if monitor_proc is not None: + monitor_proc.join(timeout=5) + except Exception: + pass + gpu_util_result = _read_gpu_monitor_result(monitor_path) + _log_and_assert_gpu_thresholds(gpu_util_result, thresholds) finally: # Always attempt to stop workers to avoid resource leakage @@ -411,6 +668,12 @@ def genai_bench_runner() -> Callable[..., None]: time.sleep(2) except Exception: pass + # Ensure GPU monitor process is cleaned up + if monitor_proc is not None and monitor_proc.is_alive(): + try: + monitor_proc.terminate() + except Exception: + pass return _run diff --git a/sgl-router/py_test/e2e/test_pd_router.py b/sgl-router/py_test/e2e/test_pd_router.py index f6a73cd01..93d92ad2a 100644 --- a/sgl-router/py_test/e2e/test_pd_router.py +++ b/sgl-router/py_test/e2e/test_pd_router.py @@ -257,6 +257,7 @@ def test_pd_genai_bench(e2e_model: str, pd_cluster, genai_bench_runner): "e2e_latency_mean_max": 15, "input_throughput_mean_min": 400, "output_throughput_mean_min": 20, + "gpu_util_p50_min": 99, }, kill_procs=pd_cluster.workers, ) diff --git a/sgl-router/py_test/e2e/test_regular_router.py b/sgl-router/py_test/e2e/test_regular_router.py index 856ecda72..d9cf4e0e4 100644 --- a/sgl-router/py_test/e2e/test_regular_router.py +++ b/sgl-router/py_test/e2e/test_regular_router.py @@ -47,6 +47,8 @@ def test_genai_bench( "e2e_latency_mean_max": 14, "input_throughput_mean_min": 1000, "output_throughput_mean_min": 12, + # Enforce GPU utilization p50 >= 99% during the run. + "gpu_util_p50_min": 99, }, kill_procs=e2e_two_workers_dp2, )