From 1a399799935640d6add25839ae31f4b74b6debe4 Mon Sep 17 00:00:00 2001 From: Chao Yang Date: Sat, 24 May 2025 22:28:15 -0700 Subject: [PATCH] Sgl-router Prometheus metrics endpoint and usage track metrics (#6537) --- sgl-router/Cargo.toml | 3 ++ sgl-router/README.md | 12 ++++++ .../py_src/sglang_router/launch_router.py | 20 +++++++++ sgl-router/py_src/sglang_router/router.py | 6 +++ sgl-router/py_test/test_launch_server.py | 10 +++++ sgl-router/src/lib.rs | 22 +++++++++- sgl-router/src/prometheus.rs | 40 +++++++++++++++++ sgl-router/src/router.rs | 43 ++++++++++++++++++- sgl-router/src/server.rs | 13 ++++++ 9 files changed, 167 insertions(+), 2 deletions(-) create mode 100644 sgl-router/src/prometheus.rs diff --git a/sgl-router/Cargo.toml b/sgl-router/Cargo.toml index 8beb82e46..418bf8d2c 100644 --- a/sgl-router/Cargo.toml +++ b/sgl-router/Cargo.toml @@ -30,6 +30,9 @@ tracing-appender = "0.2.3" kube = { version = "0.88.1", features = ["runtime", "derive"] } k8s-openapi = { version = "0.21.0", features = ["v1_29"] } futures = "0.3" +# Added for metrics +metrics = "0.24.2" +metrics-exporter-prometheus = "0.17.0" [profile.release] lto = "thin" codegen-units = 1 diff --git a/sgl-router/README.md b/sgl-router/README.md index 4ec152705..88637bd05 100644 --- a/sgl-router/README.md +++ b/sgl-router/README.md @@ -81,6 +81,18 @@ router = Router( Use the `--verbose` flag with the CLI for more detailed logs. +### Metrics + +SGL Router exposes a Prometheus HTTP scrape endpoint for monitoring, which by default listens at 127.0.0.1:29000. + +To change the endpoint to listen on all network interfaces and set the port to 9000, configure the following options when launching the router: +``` +python -m sglang_router.launch_router \ + --worker-urls http://localhost:8080 http://localhost:8081 \ + --prometheus-host 0.0.0.0 \ + --prometheus-port 9000 +``` + ### Kubernetes Service Discovery SGL Router supports automatic service discovery for worker nodes in Kubernetes environments. When enabled, the router will automatically: diff --git a/sgl-router/py_src/sglang_router/launch_router.py b/sgl-router/py_src/sglang_router/launch_router.py index 4826c4465..4f036a253 100644 --- a/sgl-router/py_src/sglang_router/launch_router.py +++ b/sgl-router/py_src/sglang_router/launch_router.py @@ -48,6 +48,9 @@ class RouterArgs: selector: Dict[str, str] = dataclasses.field(default_factory=dict) service_discovery_port: int = 80 service_discovery_namespace: Optional[str] = None + # Prometheus configuration + prometheus_port: Optional[int] = None + prometheus_host: Optional[str] = None @staticmethod def add_cli_args( @@ -176,6 +179,19 @@ class RouterArgs: type=str, help="Kubernetes namespace to watch for pods. If not provided, watches all namespaces (requires cluster-wide permissions)", ) + # Prometheus configuration + parser.add_argument( + f"--{prefix}prometheus-port", + type=int, + default=29000, + help="Port to expose Prometheus metrics. If not specified, Prometheus metrics are disabled", + ) + parser.add_argument( + f"--{prefix}prometheus-host", + type=str, + default="127.0.0.1", + help="Host address to bind the Prometheus metrics server", + ) @classmethod def from_cli_args( @@ -215,6 +231,8 @@ class RouterArgs: service_discovery_namespace=getattr( args, f"{prefix}service_discovery_namespace", None ), + prometheus_port=getattr(args, f"{prefix}prometheus_port", None), + prometheus_host=getattr(args, f"{prefix}prometheus_host", None), ) @staticmethod @@ -278,6 +296,8 @@ def launch_router(args: argparse.Namespace) -> Optional[Router]: selector=router_args.selector, service_discovery_port=router_args.service_discovery_port, service_discovery_namespace=router_args.service_discovery_namespace, + prometheus_port=router_args.prometheus_port, + prometheus_host=router_args.prometheus_host, ) router.start() diff --git a/sgl-router/py_src/sglang_router/router.py b/sgl-router/py_src/sglang_router/router.py index 3490a2574..c189cd587 100644 --- a/sgl-router/py_src/sglang_router/router.py +++ b/sgl-router/py_src/sglang_router/router.py @@ -40,6 +40,8 @@ class Router: worker URLs using this port. Default: 80 service_discovery_namespace: Kubernetes namespace to watch for pods. If not provided, watches pods across all namespaces (requires cluster-wide permissions). Default: None + prometheus_port: Port to expose Prometheus metrics. Default: None + prometheus_host: Host address to bind the Prometheus metrics server. Default: None """ def __init__( @@ -62,6 +64,8 @@ class Router: selector: Dict[str, str] = None, service_discovery_port: int = 80, service_discovery_namespace: Optional[str] = None, + prometheus_port: Optional[int] = None, + prometheus_host: Optional[str] = None, ): if selector is None: selector = {} @@ -85,6 +89,8 @@ class Router: selector=selector, service_discovery_port=service_discovery_port, service_discovery_namespace=service_discovery_namespace, + prometheus_port=prometheus_port, + prometheus_host=prometheus_host, ) def start(self) -> None: diff --git a/sgl-router/py_test/test_launch_server.py b/sgl-router/py_test/test_launch_server.py index afffe334f..bfba8a765 100644 --- a/sgl-router/py_test/test_launch_server.py +++ b/sgl-router/py_test/test_launch_server.py @@ -28,6 +28,8 @@ def popen_launch_router( selector: list = None, service_discovery_port: int = 80, service_discovery_namespace: str = None, + prometheus_port: int = None, + prometheus_host: str = None, ): """ Launch the router server process. @@ -45,6 +47,8 @@ def popen_launch_router( selector: List of label selectors in format ["key1=value1", "key2=value2"] service_discovery_port: Port to use for service discovery service_discovery_namespace: Kubernetes namespace to watch for pods. If None, watches all namespaces. + prometheus_port: Port to expose Prometheus metrics. If None, Prometheus metrics are disabled. + prometheus_host: Host address to bind the Prometheus metrics server. """ _, host, port = base_url.split(":") host = host[2:] @@ -87,6 +91,12 @@ def popen_launch_router( ["--router-service-discovery-namespace", service_discovery_namespace] ) + if prometheus_port is not None: + command.extend(["--router-prometheus-port", str(prometheus_port)]) + + if prometheus_host is not None: + command.extend(["--router-prometheus-host", prometheus_host]) + if log_dir is not None: command.extend(["--log-dir", log_dir]) diff --git a/sgl-router/src/lib.rs b/sgl-router/src/lib.rs index 4223a53fe..4915d3c52 100644 --- a/sgl-router/src/lib.rs +++ b/sgl-router/src/lib.rs @@ -1,10 +1,12 @@ use pyo3::prelude::*; pub mod logging; use std::collections::HashMap; +pub mod prometheus; pub mod router; pub mod server; pub mod service_discovery; pub mod tree; +use crate::prometheus::PrometheusConfig; #[pyclass(eq)] #[derive(Clone, PartialEq, Debug)] @@ -35,6 +37,8 @@ struct Router { selector: HashMap, service_discovery_port: u16, service_discovery_namespace: Option, + prometheus_port: Option, + prometheus_host: Option, } #[pymethods] @@ -58,7 +62,9 @@ impl Router { service_discovery = false, selector = HashMap::new(), service_discovery_port = 80, - service_discovery_namespace = None + service_discovery_namespace = None, + prometheus_port = None, + prometheus_host = None ))] fn new( worker_urls: Vec, @@ -79,6 +85,8 @@ impl Router { selector: HashMap, service_discovery_port: u16, service_discovery_namespace: Option, + prometheus_port: Option, + prometheus_host: Option, ) -> PyResult { Ok(Router { host, @@ -99,6 +107,8 @@ impl Router { selector, service_discovery_port, service_discovery_namespace, + prometheus_port, + prometheus_host, }) } @@ -136,6 +146,15 @@ impl Router { None }; + // Create Prometheus config if enabled + let prometheus_config = Some(PrometheusConfig { + port: self.prometheus_port.unwrap_or(29000), + host: self + .prometheus_host + .clone() + .unwrap_or_else(|| "127.0.0.1".to_string()), + }); + actix_web::rt::System::new().block_on(async move { server::startup(server::ServerConfig { host: self.host.clone(), @@ -146,6 +165,7 @@ impl Router { max_payload_size: self.max_payload_size, log_dir: self.log_dir.clone(), service_discovery_config, + prometheus_config, }) .await .map_err(|e| pyo3::exceptions::PyRuntimeError::new_err(e.to_string()))?; diff --git a/sgl-router/src/prometheus.rs b/sgl-router/src/prometheus.rs new file mode 100644 index 000000000..ff5a221bd --- /dev/null +++ b/sgl-router/src/prometheus.rs @@ -0,0 +1,40 @@ +use metrics_exporter_prometheus::{Matcher, PrometheusBuilder}; +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use std::time::Duration; + +#[derive(Debug, Clone)] +pub struct PrometheusConfig { + pub port: u16, + pub host: String, +} + +impl Default for PrometheusConfig { + fn default() -> Self { + Self { + port: 29000, + host: "0.0.0.0".to_string(), + } + } +} + +pub fn start_prometheus(config: PrometheusConfig) { + let duration_matcher = Matcher::Suffix(String::from("duration")); + let duration_bucket = [ + 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 15.0, 30.0, 45.0, + 60.0, 90.0, 120.0, 180.0, 240.0, + ]; + + let ip_addr: IpAddr = config + .host + .parse() + .unwrap_or(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0))); + let socket_addr = SocketAddr::new(ip_addr, config.port); + + PrometheusBuilder::new() + .with_http_listener(socket_addr) + .upkeep_timeout(Duration::from_secs(5 * 60)) + .set_buckets_for_metric(duration_matcher, &duration_bucket) + .expect("failed to set duration bucket") + .install() + .expect("failed to install Prometheus metrics exporter"); +} diff --git a/sgl-router/src/router.rs b/sgl-router/src/router.rs index 678c416b5..1666cc0db 100644 --- a/sgl-router/src/router.rs +++ b/sgl-router/src/router.rs @@ -1,4 +1,5 @@ use crate::tree::Tree; +use ::metrics::{counter, gauge, histogram}; use actix_web::http::header::{HeaderValue, CONTENT_TYPE}; use actix_web::{HttpRequest, HttpResponse}; use bytes::Bytes; @@ -10,6 +11,7 @@ use std::sync::atomic::AtomicUsize; use std::sync::{Arc, Mutex, RwLock}; use std::thread; use std::time::Duration; +use std::time::Instant; use tokio; use tracing::{debug, error, info, warn}; @@ -135,6 +137,9 @@ pub enum PolicyConfig { impl Router { pub fn new(worker_urls: Vec, policy_config: PolicyConfig) -> Result { + // Update active workers gauge + gauge!("sgl_router_active_workers").set(worker_urls.len() as f64); + // Get timeout and interval from policy config let (timeout_secs, interval_secs) = match &policy_config { PolicyConfig::RandomConfig { @@ -328,6 +333,7 @@ impl Router { route: &str, req: &HttpRequest, ) -> HttpResponse { + let start = Instant::now(); let mut request_builder = client.get(format!("{}{}", worker_url, route)); // Copy all headers from original request except for /health because it does not need authorization @@ -337,7 +343,7 @@ impl Router { } } - match request_builder.send().await { + let response = match request_builder.send().await { Ok(res) => { let status = actix_web::http::StatusCode::from_u16(res.status().as_u16()) .unwrap_or(actix_web::http::StatusCode::INTERNAL_SERVER_ERROR); @@ -352,7 +358,21 @@ impl Router { "Failed to send request to worker {}: {}", worker_url, e )), + }; + + // Record request metrics + if route != "/health" { + let duration = start.elapsed(); + counter!("sgl_router_requests_total", "route" => route.to_string()).increment(1); + histogram!("sgl_router_request_duration_seconds", "route" => route.to_string()) + .record(duration.as_secs_f64()); + + if !response.status().is_success() { + counter!("sgl_router_request_errors_total", "route" => route.to_string()) + .increment(1); + } } + response } pub async fn route_to_first( @@ -510,6 +530,10 @@ impl Router { max_load, min_load, running_queue ); + counter!("sgl_router_load_balancing_events_total").increment(1); + gauge!("sgl_router_max_load").set(max_load as f64); + gauge!("sgl_router_min_load").set(min_load as f64); + // Use shortest queue routing when load is imbalanced running_queue .iter() @@ -523,8 +547,10 @@ impl Router { matched_text.chars().count() as f32 / text.chars().count() as f32; if matched_rate > *cache_threshold { + counter!("sgl_router_cache_hits_total").increment(1); matched_worker.to_string() } else { + counter!("sgl_router_cache_misses_total").increment(1); tree.get_smallest_tenant() } }; @@ -537,6 +563,11 @@ impl Router { .unwrap() .get_mut(&selected_url) .unwrap() += 1; + + gauge!("sgl_router_running_requests", "worker" => selected_url.to_string()) + .set(*running_queue.get(&selected_url).unwrap() as f64); + counter!("sgl_router_processed_requests_total", "worker" => selected_url.to_string()).increment(1); + tree.insert(&text, &selected_url); selected_url @@ -636,6 +667,7 @@ impl Router { body: &Bytes, route: &str, ) -> HttpResponse { + let start = Instant::now(); const MAX_REQUEST_RETRIES: u32 = 3; const MAX_TOTAL_RETRIES: u32 = 6; let mut total_retries = 0; @@ -648,18 +680,24 @@ impl Router { while request_retries < MAX_REQUEST_RETRIES { if total_retries >= 1 { info!("Retrying request after {} failed attempts", total_retries); + counter!("sgl_router_retries_total", "route" => route.to_string()).increment(1); } + let response = self .send_generate_request(client, req, body, route, &worker_url) .await; if response.status().is_success() { + let duration = start.elapsed(); + histogram!("sgl_router_generate_duration_seconds", "route" => route.to_string()).record(duration.as_secs_f64()); return response; } else { // if the worker is healthy, it means the request is bad, so return the error response let health_response = self.send_request(client, &worker_url, "/health", req).await; if health_response.status().is_success() { + counter!("sgl_router_request_errors_total", "route" => route.to_string()) + .increment(1); return response; } } @@ -682,6 +720,7 @@ impl Router { } } + counter!("sgl_router_request_errors_total", "route" => route.to_string()).increment(1); HttpResponse::InternalServerError().body("All retry attempts failed") } @@ -733,6 +772,7 @@ impl Router { } info!("Added worker: {}", worker_url); urls.push(worker_url.to_string()); + gauge!("sgl_router_active_workers").set(urls.len() as f64); } } @@ -804,6 +844,7 @@ impl Router { if let Some(index) = urls.iter().position(|url| url == &worker_url) { urls.remove(index); info!("Removed worker: {}", worker_url); + gauge!("sgl_router_active_workers").set(urls.len() as f64); } else { warn!("Worker {} not found, skipping removal", worker_url); return; diff --git a/sgl-router/src/server.rs b/sgl-router/src/server.rs index e556a3d52..e60760415 100644 --- a/sgl-router/src/server.rs +++ b/sgl-router/src/server.rs @@ -1,4 +1,5 @@ use crate::logging::{self, LoggingConfig}; +use crate::prometheus::{self, PrometheusConfig}; use crate::router::PolicyConfig; use crate::router::Router; use crate::service_discovery::{start_service_discovery, ServiceDiscoveryConfig}; @@ -161,6 +162,7 @@ pub struct ServerConfig { pub max_payload_size: usize, pub log_dir: Option, pub service_discovery_config: Option, + pub prometheus_config: Option, } pub async fn startup(config: ServerConfig) -> std::io::Result<()> { @@ -184,6 +186,17 @@ pub async fn startup(config: ServerConfig) -> std::io::Result<()> { None }; + // Initialize prometheus metrics exporter + if let Some(prometheus_config) = config.prometheus_config { + info!( + "🚧 Initializing Prometheus metrics on {}:{}", + prometheus_config.host, prometheus_config.port + ); + prometheus::start_prometheus(prometheus_config); + } else { + info!("🚧 Prometheus metrics disabled"); + } + info!("🚧 Initializing router on {}:{}", config.host, config.port); info!("🚧 Initializing workers on {:?}", config.worker_urls); info!("🚧 Policy Config: {:?}", config.policy_config);