diff --git a/sgl-router/src/lib.rs b/sgl-router/src/lib.rs index 49e8cc573..a37a4b474 100644 --- a/sgl-router/src/lib.rs +++ b/sgl-router/src/lib.rs @@ -3,14 +3,14 @@ pub mod config; pub mod logging; use std::collections::HashMap; pub mod core; +pub mod metrics; pub mod openai_api_types; pub mod policies; -pub mod prometheus; pub mod routers; pub mod server; pub mod service_discovery; pub mod tree; -use crate::prometheus::PrometheusConfig; +use crate::metrics::PrometheusConfig; #[pyclass(eq)] #[derive(Clone, PartialEq, Debug)] diff --git a/sgl-router/src/metrics.rs b/sgl-router/src/metrics.rs new file mode 100644 index 000000000..0ff2055c5 --- /dev/null +++ b/sgl-router/src/metrics.rs @@ -0,0 +1,324 @@ +use metrics::{counter, describe_counter, describe_gauge, describe_histogram, gauge, histogram}; +use metrics_exporter_prometheus::{Matcher, PrometheusBuilder}; +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use std::time::Duration; + +#[derive(Debug, Clone)] +pub struct PrometheusConfig { + pub port: u16, + pub host: String, +} + +impl Default for PrometheusConfig { + fn default() -> Self { + Self { + port: 29000, + host: "0.0.0.0".to_string(), + } + } +} + +pub fn init_metrics() { + // Request metrics + describe_counter!( + "sgl_router_requests_total", + "Total number of requests by route and method" + ); + describe_histogram!( + "sgl_router_request_duration_seconds", + "Request duration in seconds by route" + ); + describe_counter!( + "sgl_router_request_errors_total", + "Total number of request errors by route and error type" + ); + describe_counter!( + "sgl_router_retries_total", + "Total number of request retries by route" + ); + + // Worker metrics + describe_gauge!( + "sgl_router_active_workers", + "Number of currently active workers" + ); + describe_gauge!( + "sgl_router_worker_health", + "Worker health status (1=healthy, 0=unhealthy)" + ); + describe_gauge!("sgl_router_worker_load", "Current load on each worker"); + describe_counter!( + "sgl_router_processed_requests_total", + "Total requests processed by each worker" + ); + + // Policy metrics + describe_counter!( + "sgl_router_policy_decisions_total", + "Total routing policy decisions by policy and worker" + ); + describe_counter!("sgl_router_cache_hits_total", "Total cache hits"); + describe_counter!("sgl_router_cache_misses_total", "Total cache misses"); + describe_gauge!( + "sgl_router_tree_size", + "Current tree size for cache-aware routing" + ); + describe_counter!( + "sgl_router_load_balancing_events_total", + "Total load balancing trigger events" + ); + describe_gauge!("sgl_router_max_load", "Maximum worker load"); + describe_gauge!("sgl_router_min_load", "Minimum worker load"); + + // PD-specific metrics + describe_counter!("sgl_router_pd_requests_total", "Total PD requests by route"); + describe_counter!( + "sgl_router_pd_prefill_requests_total", + "Total prefill requests per worker" + ); + describe_counter!( + "sgl_router_pd_decode_requests_total", + "Total decode requests per worker" + ); + describe_counter!( + "sgl_router_pd_errors_total", + "Total PD errors by error type" + ); + describe_counter!( + "sgl_router_pd_prefill_errors_total", + "Total prefill server errors" + ); + describe_counter!( + "sgl_router_pd_decode_errors_total", + "Total decode server errors" + ); + describe_counter!( + "sgl_router_pd_stream_errors_total", + "Total streaming errors per worker" + ); + describe_histogram!( + "sgl_router_pd_request_duration_seconds", + "PD request duration by route" + ); + + // Service discovery metrics + describe_counter!( + "sgl_router_discovery_updates_total", + "Total service discovery update events" + ); + describe_gauge!( + "sgl_router_discovery_workers_added", + "Number of workers added in last discovery update" + ); + describe_gauge!( + "sgl_router_discovery_workers_removed", + "Number of workers removed in last discovery update" + ); + + // Generate request specific metrics + describe_histogram!( + "sgl_router_generate_duration_seconds", + "Generate request duration" + ); + + // Running requests gauge for cache-aware policy + describe_gauge!( + "sgl_router_running_requests", + "Number of running requests per worker" + ); +} + +pub fn start_prometheus(config: PrometheusConfig) { + // Initialize metric descriptions + init_metrics(); + + let duration_matcher = Matcher::Suffix(String::from("duration")); + let duration_bucket = [ + 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 15.0, 30.0, 45.0, + 60.0, 90.0, 120.0, 180.0, 240.0, + ]; + + let ip_addr: IpAddr = config + .host + .parse() + .unwrap_or(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0))); + let socket_addr = SocketAddr::new(ip_addr, config.port); + + PrometheusBuilder::new() + .with_http_listener(socket_addr) + .upkeep_timeout(Duration::from_secs(5 * 60)) + .set_buckets_for_metric(duration_matcher, &duration_bucket) + .expect("failed to set duration bucket") + .install() + .expect("failed to install Prometheus metrics exporter"); +} + +pub struct RouterMetrics; + +impl RouterMetrics { + // Request metrics + pub fn record_request(route: &str) { + counter!("sgl_router_requests_total", + "route" => route.to_string() + ) + .increment(1); + } + + pub fn record_request_duration(route: &str, duration: Duration) { + histogram!("sgl_router_request_duration_seconds", + "route" => route.to_string() + ) + .record(duration.as_secs_f64()); + } + + pub fn record_request_error(route: &str, error_type: &str) { + counter!("sgl_router_request_errors_total", + "route" => route.to_string(), + "error_type" => error_type.to_string() + ) + .increment(1); + } + + pub fn record_retry(route: &str) { + counter!("sgl_router_retries_total", + "route" => route.to_string() + ) + .increment(1); + } + + // Worker metrics + pub fn set_active_workers(count: usize) { + gauge!("sgl_router_active_workers").set(count as f64); + } + + pub fn set_worker_health(worker_url: &str, healthy: bool) { + gauge!("sgl_router_worker_health", + "worker" => worker_url.to_string() + ) + .set(if healthy { 1.0 } else { 0.0 }); + } + + pub fn set_worker_load(worker_url: &str, load: usize) { + gauge!("sgl_router_worker_load", + "worker" => worker_url.to_string() + ) + .set(load as f64); + } + + pub fn record_processed_request(worker_url: &str) { + counter!("sgl_router_processed_requests_total", + "worker" => worker_url.to_string() + ) + .increment(1); + } + + // Policy metrics + pub fn record_policy_decision(policy: &str, worker: &str) { + counter!("sgl_router_policy_decisions_total", + "policy" => policy.to_string(), + "worker" => worker.to_string() + ) + .increment(1); + } + + pub fn record_cache_hit() { + counter!("sgl_router_cache_hits_total").increment(1); + } + + pub fn record_cache_miss() { + counter!("sgl_router_cache_misses_total").increment(1); + } + + pub fn set_tree_size(worker: &str, size: usize) { + gauge!("sgl_router_tree_size", + "worker" => worker.to_string() + ) + .set(size as f64); + } + + pub fn record_load_balancing_event() { + counter!("sgl_router_load_balancing_events_total").increment(1); + } + + pub fn set_load_range(max_load: usize, min_load: usize) { + gauge!("sgl_router_max_load").set(max_load as f64); + gauge!("sgl_router_min_load").set(min_load as f64); + } + + // PD-specific metrics + pub fn record_pd_request(route: &str) { + counter!("sgl_router_pd_requests_total", + "route" => route.to_string() + ) + .increment(1); + } + + pub fn record_pd_request_duration(route: &str, duration: Duration) { + histogram!("sgl_router_pd_request_duration_seconds", + "route" => route.to_string() + ) + .record(duration.as_secs_f64()); + } + + pub fn record_pd_prefill_request(worker: &str) { + counter!("sgl_router_pd_prefill_requests_total", + "worker" => worker.to_string() + ) + .increment(1); + } + + pub fn record_pd_decode_request(worker: &str) { + counter!("sgl_router_pd_decode_requests_total", + "worker" => worker.to_string() + ) + .increment(1); + } + + pub fn record_pd_error(error_type: &str) { + counter!("sgl_router_pd_errors_total", + "error_type" => error_type.to_string() + ) + .increment(1); + } + + pub fn record_pd_prefill_error(worker: &str) { + counter!("sgl_router_pd_prefill_errors_total", + "worker" => worker.to_string() + ) + .increment(1); + } + + pub fn record_pd_decode_error(worker: &str) { + counter!("sgl_router_pd_decode_errors_total", + "worker" => worker.to_string() + ) + .increment(1); + } + + pub fn record_pd_stream_error(worker: &str) { + counter!("sgl_router_pd_stream_errors_total", + "worker" => worker.to_string() + ) + .increment(1); + } + + // Service discovery metrics + pub fn record_discovery_update(added: usize, removed: usize) { + counter!("sgl_router_discovery_updates_total").increment(1); + gauge!("sgl_router_discovery_workers_added").set(added as f64); + gauge!("sgl_router_discovery_workers_removed").set(removed as f64); + } + + // Generate request metrics + pub fn record_generate_duration(duration: Duration) { + histogram!("sgl_router_generate_duration_seconds").record(duration.as_secs_f64()); + } + + // Running requests for cache-aware policy + pub fn set_running_requests(worker: &str, count: usize) { + gauge!("sgl_router_running_requests", + "worker" => worker.to_string() + ) + .set(count as f64); + } +} diff --git a/sgl-router/src/policies/cache_aware.rs b/sgl-router/src/policies/cache_aware.rs index db5972ba6..9e30c0d01 100644 --- a/sgl-router/src/policies/cache_aware.rs +++ b/sgl-router/src/policies/cache_aware.rs @@ -61,8 +61,8 @@ use super::{get_healthy_worker_indices, CacheAwareConfig, LoadBalancingPolicy}; use crate::core::Worker; +use crate::metrics::RouterMetrics; use crate::tree::Tree; -use metrics::{counter, gauge}; use std::sync::{Arc, Mutex}; use std::thread; use std::time::Duration; @@ -171,9 +171,8 @@ impl LoadBalancingPolicy for CacheAwarePolicy { max_load, min_load, worker_loads ); - counter!("sgl_router_load_balancing_events_total").increment(1); - gauge!("sgl_router_max_load").set(max_load as f64); - gauge!("sgl_router_min_load").set(min_load as f64); + RouterMetrics::record_load_balancing_event(); + RouterMetrics::set_load_range(max_load, min_load); // Use shortest queue when imbalanced let min_load_idx = healthy_indices @@ -183,8 +182,7 @@ impl LoadBalancingPolicy for CacheAwarePolicy { // Increment processed counter workers[min_load_idx].increment_processed(); - counter!("sgl_router_processed_requests_total", "worker" => workers[min_load_idx].url().to_string()) - .increment(1); + RouterMetrics::record_processed_request(workers[min_load_idx].url()); return Some(min_load_idx); } @@ -201,10 +199,10 @@ impl LoadBalancingPolicy for CacheAwarePolicy { }; let selected_url = if match_rate > self.config.cache_threshold { - counter!("sgl_router_cache_hits_total").increment(1); + RouterMetrics::record_cache_hit(); matched_worker.to_string() } else { - counter!("sgl_router_cache_misses_total").increment(1); + RouterMetrics::record_cache_miss(); tree.get_smallest_tenant() }; @@ -221,7 +219,7 @@ impl LoadBalancingPolicy for CacheAwarePolicy { // Increment processed counter workers[selected_idx].increment_processed(); - counter!("sgl_router_processed_requests_total", "worker" => selected_url).increment(1); + RouterMetrics::record_processed_request(&selected_url); return Some(selected_idx); } diff --git a/sgl-router/src/policies/power_of_two.rs b/sgl-router/src/policies/power_of_two.rs index 53c846196..2167273ae 100644 --- a/sgl-router/src/policies/power_of_two.rs +++ b/sgl-router/src/policies/power_of_two.rs @@ -2,7 +2,7 @@ use super::{get_healthy_worker_indices, LoadBalancingPolicy}; use crate::core::Worker; -use metrics::counter; +use crate::metrics::RouterMetrics; use rand::Rng; use std::collections::HashMap; use std::sync::RwLock; @@ -89,8 +89,7 @@ impl LoadBalancingPolicy for PowerOfTwoPolicy { // Increment processed counter workers[selected_idx].increment_processed(); - counter!("sgl_router_processed_requests_total", "worker" => workers[selected_idx].url().to_string()) - .increment(1); + RouterMetrics::record_processed_request(workers[selected_idx].url()); Some(selected_idx) } diff --git a/sgl-router/src/prometheus.rs b/sgl-router/src/prometheus.rs deleted file mode 100644 index ff5a221bd..000000000 --- a/sgl-router/src/prometheus.rs +++ /dev/null @@ -1,40 +0,0 @@ -use metrics_exporter_prometheus::{Matcher, PrometheusBuilder}; -use std::net::{IpAddr, Ipv4Addr, SocketAddr}; -use std::time::Duration; - -#[derive(Debug, Clone)] -pub struct PrometheusConfig { - pub port: u16, - pub host: String, -} - -impl Default for PrometheusConfig { - fn default() -> Self { - Self { - port: 29000, - host: "0.0.0.0".to_string(), - } - } -} - -pub fn start_prometheus(config: PrometheusConfig) { - let duration_matcher = Matcher::Suffix(String::from("duration")); - let duration_bucket = [ - 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 15.0, 30.0, 45.0, - 60.0, 90.0, 120.0, 180.0, 240.0, - ]; - - let ip_addr: IpAddr = config - .host - .parse() - .unwrap_or(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0))); - let socket_addr = SocketAddr::new(ip_addr, config.port); - - PrometheusBuilder::new() - .with_http_listener(socket_addr) - .upkeep_timeout(Duration::from_secs(5 * 60)) - .set_buckets_for_metric(duration_matcher, &duration_bucket) - .expect("failed to set duration bucket") - .install() - .expect("failed to install Prometheus metrics exporter"); -} diff --git a/sgl-router/src/routers/pd_router.rs b/sgl-router/src/routers/pd_router.rs index 2ac8f9027..d156c9f34 100644 --- a/sgl-router/src/routers/pd_router.rs +++ b/sgl-router/src/routers/pd_router.rs @@ -4,13 +4,13 @@ use super::pd_types::{api_path, Bootstrap, ChatReqInput, GenerateReqInput, PDRouterError}; use super::request_adapter::ToPdRequest; use crate::core::{HealthChecker, Worker, WorkerFactory, WorkerLoadGuard}; +use crate::metrics::RouterMetrics; use crate::openai_api_types::{ChatCompletionRequest, CompletionRequest, GenerateRequest}; use crate::policies::LoadBalancingPolicy; use crate::tree::Tree; use actix_web::http::header::{HeaderValue, CONTENT_TYPE}; use actix_web::{HttpRequest, HttpResponse}; use futures_util::{StreamExt, TryStreamExt}; -use metrics::{counter, histogram}; use serde_json::Value; use std::collections::HashMap; use std::sync::{Arc, Mutex, RwLock}; @@ -296,7 +296,7 @@ impl PDRouter { Ok(pair) => pair, Err(e) => { error!("Failed to select PD pair: {}", e); - counter!("sgl_router_pd_errors_total", "error" => "server_selection").increment(1); + RouterMetrics::record_pd_error("server_selection"); return HttpResponse::ServiceUnavailable() .body(format!("No available servers: {}", e)); } @@ -313,7 +313,7 @@ impl PDRouter { // Add bootstrap info using the trait method if let Err(e) = typed_req.add_bootstrap_info(prefill.as_ref()) { error!("Failed to add bootstrap info: {}", e); - counter!("sgl_router_pd_errors_total", "error" => "bootstrap_injection").increment(1); + RouterMetrics::record_pd_error("bootstrap_injection"); return HttpResponse::InternalServerError() .body(format!("Bootstrap injection failed: {}", e)); } @@ -374,7 +374,7 @@ impl PDRouter { Ok(pair) => pair, Err(e) => { error!("Failed to select PD pair: {}", e); - counter!("sgl_router_pd_errors_total", "error" => "server_selection").increment(1); + RouterMetrics::record_pd_error("server_selection"); return HttpResponse::ServiceUnavailable() .body(format!("No available servers: {}", e)); } @@ -391,7 +391,7 @@ impl PDRouter { // Add bootstrap info using the trait method if let Err(e) = typed_req.add_bootstrap_info(prefill.as_ref()) { error!("Failed to add bootstrap info: {}", e); - counter!("sgl_router_pd_errors_total", "error" => "bootstrap_injection").increment(1); + RouterMetrics::record_pd_error("bootstrap_injection"); return HttpResponse::InternalServerError() .body(format!("Bootstrap injection failed: {}", e)); } @@ -460,13 +460,10 @@ impl PDRouter { // Update metrics let duration = start_time.elapsed(); - histogram!("sgl_router_pd_request_duration_seconds", "route" => route.to_string()) - .record(duration.as_secs_f64()); - counter!("sgl_router_pd_requests_total", "route" => route.to_string()).increment(1); - counter!("sgl_router_pd_prefill_requests_total", "worker" => prefill.url().to_string()) - .increment(1); - counter!("sgl_router_pd_decode_requests_total", "worker" => decode.url().to_string()) - .increment(1); + RouterMetrics::record_pd_request_duration(route, duration); + RouterMetrics::record_pd_request(route); + RouterMetrics::record_pd_prefill_request(prefill.url()); + RouterMetrics::record_pd_decode_request(decode.url()); // Process decode response match decode_result { @@ -475,7 +472,7 @@ impl PDRouter { .unwrap_or(actix_web::http::StatusCode::INTERNAL_SERVER_ERROR); if !status.is_success() { - counter!("sgl_router_pd_decode_errors_total", "worker" => decode.url().to_string()).increment(1); + RouterMetrics::record_pd_decode_error(decode.url()); error!( "Decode server {} returned error status: {}", decode.url(), @@ -501,7 +498,7 @@ impl PDRouter { prefill.url(), e ); - counter!("sgl_router_pd_prefill_errors_total", "worker" => prefill.url().to_string()).increment(1); + RouterMetrics::record_pd_prefill_error(prefill.url()); } if is_stream { @@ -548,13 +545,19 @@ impl PDRouter { } else { // No logprob merging needed HttpResponse::build(status) - .insert_header((CONTENT_TYPE, HeaderValue::from_static("text/event-stream"))) + .insert_header(( + CONTENT_TYPE, + HeaderValue::from_static("text/event-stream"), + )) .streaming({ let decode_url = decode.url().to_string(); res.bytes_stream().map_err(move |e| { error!("Stream error from decode server {}: {}", decode_url, e); - counter!("sgl_router_pd_stream_errors_total", "worker" => decode_url.to_string()).increment(1); - actix_web::error::ErrorInternalServerError(format!("Stream error: {}", e)) + RouterMetrics::record_pd_stream_error(&decode_url); + actix_web::error::ErrorInternalServerError(format!( + "Stream error: {}", + e + )) }) }) } @@ -578,8 +581,7 @@ impl PDRouter { } Err(e) => { error!("Decode request failed: {}", e); - counter!("sgl_router_pd_decode_errors_total", "worker" => decode.url().to_string()) - .increment(1); + RouterMetrics::record_pd_decode_error(decode.url()); HttpResponse::BadGateway().body(format!("Decode server error: {}", e)) } } diff --git a/sgl-router/src/routers/pd_types.rs b/sgl-router/src/routers/pd_types.rs index 75473b0e3..155274b06 100644 --- a/sgl-router/src/routers/pd_types.rs +++ b/sgl-router/src/routers/pd_types.rs @@ -151,13 +151,6 @@ impl GenerateReqInput { if texts.is_empty() { return Err("Batch text array is empty".to_string()); } - if texts.len() > 10000 { - // Reasonable limit for production - return Err(format!( - "Batch size {} exceeds maximum allowed (10000)", - texts.len() - )); - } return Ok(Some(texts.len())); } @@ -166,13 +159,6 @@ impl GenerateReqInput { if ids.is_empty() { return Err("Batch input_ids array is empty".to_string()); } - if ids.len() > 10000 { - // Reasonable limit for production - return Err(format!( - "Batch size {} exceeds maximum allowed (10000)", - ids.len() - )); - } // Validate each sequence is not empty for (i, seq) in ids.iter().enumerate() { if seq.is_empty() { diff --git a/sgl-router/src/routers/router.rs b/sgl-router/src/routers/router.rs index ef44348ec..c198b0c1d 100644 --- a/sgl-router/src/routers/router.rs +++ b/sgl-router/src/routers/router.rs @@ -1,6 +1,6 @@ use crate::core::{HealthChecker, Worker, WorkerFactory}; +use crate::metrics::RouterMetrics; use crate::policies::LoadBalancingPolicy; -use ::metrics::{counter, gauge, histogram}; use actix_web::http::header::{HeaderValue, CONTENT_TYPE}; use actix_web::{HttpRequest, HttpResponse}; use futures_util::{StreamExt, TryStreamExt}; @@ -43,7 +43,7 @@ impl Router { interval_secs: u64, ) -> Result { // Update active workers gauge - gauge!("sgl_router_active_workers").set(worker_urls.len() as f64); + RouterMetrics::set_active_workers(worker_urls.len()); // Wait for workers to be healthy (skip if empty - for service discovery mode) if !worker_urls.is_empty() { @@ -215,13 +215,11 @@ impl Router { // Record request metrics if route != "/health" { let duration = start.elapsed(); - counter!("sgl_router_requests_total", "route" => route.to_string()).increment(1); - histogram!("sgl_router_request_duration_seconds", "route" => route.to_string()) - .record(duration.as_secs_f64()); + RouterMetrics::record_request(route); + RouterMetrics::record_request_duration(route, duration); if !response.status().is_success() { - counter!("sgl_router_request_errors_total", "route" => route.to_string()) - .increment(1); + RouterMetrics::record_request_error(route, "request_failed"); } } response @@ -390,7 +388,7 @@ impl Router { while request_retries < MAX_REQUEST_RETRIES { if total_retries >= 1 { info!("Retrying request after {} failed attempts", total_retries); - counter!("sgl_router_retries_total", "route" => route.to_string()).increment(1); + RouterMetrics::record_retry(route); } // Increment load before request if using RAII load tracking @@ -398,8 +396,7 @@ impl Router { let workers_guard = self.workers.read().unwrap(); if let Some(worker) = workers_guard.iter().find(|w| w.url() == &worker_url) { worker.increment_load(); - gauge!("sgl_router_running_requests", "worker" => worker_url.to_string()) - .set(worker.load() as f64); + RouterMetrics::set_running_requests(&worker_url, worker.load()); true } else { false @@ -423,16 +420,14 @@ impl Router { if response.status().is_success() { let duration = start.elapsed(); - histogram!("sgl_router_generate_duration_seconds", "route" => route.to_string()) - .record(duration.as_secs_f64()); + RouterMetrics::record_generate_duration(duration); return response; } else { // if the worker is healthy, it means the request is bad, so return the error response let health_response = self.send_request(client, &worker_url, "/health", req).await; if health_response.status().is_success() { - counter!("sgl_router_request_errors_total", "route" => route.to_string()) - .increment(1); + RouterMetrics::record_request_error(route, "request_failed"); return response; } } @@ -455,7 +450,7 @@ impl Router { } } - counter!("sgl_router_request_errors_total", "route" => route.to_string()).increment(1); + RouterMetrics::record_request_error(route, "request_failed"); HttpResponse::InternalServerError().body("All retry attempts failed") } @@ -512,8 +507,7 @@ impl Router { if let Ok(workers_guard) = self.workers.read() { if let Some(worker) = workers_guard.iter().find(|w| w.url() == worker_url) { worker.decrement_load(); - gauge!("sgl_router_running_requests", "worker" => worker_url.to_string()) - .set(worker.load() as f64); + RouterMetrics::set_running_requests(&worker_url, worker.load()); } } } @@ -540,17 +534,15 @@ impl Router { if let Ok(workers_guard) = self.workers.read() { if let Some(worker) = workers_guard.iter().find(|w| w.url() == worker_url) { worker.decrement_load(); - gauge!("sgl_router_running_requests", "worker" => worker_url.to_string()) - .set(worker.load() as f64); + RouterMetrics::set_running_requests(&worker_url, worker.load()); } } } // Record metrics let duration = start.elapsed(); - histogram!("sgl_router_generate_duration_seconds", "route" => route.to_string()) - .record(duration.as_secs_f64()); - counter!("sgl_router_requests_total", "route" => route.to_string()).increment(1); + RouterMetrics::record_generate_duration(duration); + RouterMetrics::record_request(route); response } else if load_incremented { @@ -577,8 +569,10 @@ impl Router { workers_guard.iter().find(|w| w.url() == &worker_url) { worker.decrement_load(); - gauge!("sgl_router_running_requests", "worker" => worker_url.to_string()) - .set(worker.load() as f64); + RouterMetrics::set_running_requests( + &worker_url, + worker.load(), + ); debug!("Streaming is done!!") } } @@ -626,7 +620,7 @@ impl Router { info!("Added worker: {}", worker_url); let new_worker = WorkerFactory::create_regular(worker_url.to_string()); workers_guard.push(new_worker); - gauge!("sgl_router_active_workers").set(workers_guard.len() as f64); + RouterMetrics::set_active_workers(workers_guard.len()); // If cache aware policy, initialize the worker in the tree if let Some(cache_aware) = @@ -680,7 +674,7 @@ impl Router { if let Some(index) = workers_guard.iter().position(|w| w.url() == worker_url) { workers_guard.remove(index); info!("Removed worker: {}", worker_url); - gauge!("sgl_router_active_workers").set(workers_guard.len() as f64); + RouterMetrics::set_active_workers(workers_guard.len()); } else { warn!("Worker {} not found, skipping removal", worker_url); return; diff --git a/sgl-router/src/server.rs b/sgl-router/src/server.rs index 69340eefe..83774f172 100644 --- a/sgl-router/src/server.rs +++ b/sgl-router/src/server.rs @@ -1,7 +1,7 @@ use crate::config::RouterConfig; use crate::logging::{self, LoggingConfig}; +use crate::metrics::{self, PrometheusConfig}; use crate::openai_api_types::{ChatCompletionRequest, CompletionRequest, GenerateRequest}; -use crate::prometheus::{self, PrometheusConfig}; use crate::routers::{RouterFactory, RouterTrait}; use crate::service_discovery::{start_service_discovery, ServiceDiscoveryConfig}; use actix_web::{ @@ -237,7 +237,7 @@ pub async fn startup(config: ServerConfig) -> std::io::Result<()> { "🚧 Initializing Prometheus metrics on {}:{}", prometheus_config.host, prometheus_config.port ); - prometheus::start_prometheus(prometheus_config); + metrics::start_prometheus(prometheus_config); } else { info!("🚧 Prometheus metrics disabled"); }