From 36bfddecb97aeca9e862395edba2f082e3c01ec5 Mon Sep 17 00:00:00 2001 From: Tony Lu Date: Sat, 9 Aug 2025 04:41:40 +0800 Subject: [PATCH] [router] add metrics for worker and policy (#8971) Signed-off-by: Tony Lu --- sgl-router/src/core/worker.rs | 2 ++ sgl-router/src/policies/cache_aware.rs | 1 + sgl-router/src/policies/power_of_two.rs | 1 + sgl-router/src/policies/random.rs | 5 +++++ sgl-router/src/policies/round_robin.rs | 4 ++++ 5 files changed, 13 insertions(+) diff --git a/sgl-router/src/core/worker.rs b/sgl-router/src/core/worker.rs index d22a69abc..f9cb76430 100644 --- a/sgl-router/src/core/worker.rs +++ b/sgl-router/src/core/worker.rs @@ -1,4 +1,5 @@ use super::{CircuitBreaker, CircuitBreakerConfig, WorkerError, WorkerResult}; +use crate::metrics::RouterMetrics; use async_trait::async_trait; use futures; use serde_json; @@ -259,6 +260,7 @@ impl Worker for BasicWorker { fn set_healthy(&self, healthy: bool) { self.healthy.store(healthy, Ordering::Release); + RouterMetrics::set_worker_health(self.url(), healthy); } async fn check_health_async(&self) -> WorkerResult<()> { diff --git a/sgl-router/src/policies/cache_aware.rs b/sgl-router/src/policies/cache_aware.rs index 74061cb49..922ba85e0 100644 --- a/sgl-router/src/policies/cache_aware.rs +++ b/sgl-router/src/policies/cache_aware.rs @@ -181,6 +181,7 @@ impl LoadBalancingPolicy for CacheAwarePolicy { // Increment processed counter workers[min_load_idx].increment_processed(); RouterMetrics::record_processed_request(workers[min_load_idx].url()); + RouterMetrics::record_policy_decision(self.name(), workers[min_load_idx].url()); return Some(min_load_idx); } diff --git a/sgl-router/src/policies/power_of_two.rs b/sgl-router/src/policies/power_of_two.rs index 2167273ae..37078f9f3 100644 --- a/sgl-router/src/policies/power_of_two.rs +++ b/sgl-router/src/policies/power_of_two.rs @@ -90,6 +90,7 @@ impl LoadBalancingPolicy for PowerOfTwoPolicy { // Increment processed counter workers[selected_idx].increment_processed(); RouterMetrics::record_processed_request(workers[selected_idx].url()); + RouterMetrics::record_policy_decision(self.name(), workers[selected_idx].url()); Some(selected_idx) } diff --git a/sgl-router/src/policies/random.rs b/sgl-router/src/policies/random.rs index 50920bdf1..bc299bce4 100644 --- a/sgl-router/src/policies/random.rs +++ b/sgl-router/src/policies/random.rs @@ -2,6 +2,7 @@ use super::{get_healthy_worker_indices, LoadBalancingPolicy}; use crate::core::Worker; +use crate::metrics::RouterMetrics; use rand::Rng; /// Random selection policy @@ -30,6 +31,10 @@ impl LoadBalancingPolicy for RandomPolicy { let mut rng = rand::thread_rng(); let random_idx = rng.gen_range(0..healthy_indices.len()); + let worker = workers[healthy_indices[random_idx]].url(); + + RouterMetrics::record_processed_request(worker); + RouterMetrics::record_policy_decision(self.name(), worker); Some(healthy_indices[random_idx]) } diff --git a/sgl-router/src/policies/round_robin.rs b/sgl-router/src/policies/round_robin.rs index 4401605f0..fcb60233f 100644 --- a/sgl-router/src/policies/round_robin.rs +++ b/sgl-router/src/policies/round_robin.rs @@ -2,6 +2,7 @@ use super::{get_healthy_worker_indices, LoadBalancingPolicy}; use crate::core::Worker; +use crate::metrics::RouterMetrics; use std::sync::atomic::{AtomicUsize, Ordering}; /// Round-robin selection policy @@ -35,7 +36,10 @@ impl LoadBalancingPolicy for RoundRobinPolicy { // Get and increment counter atomically let count = self.counter.fetch_add(1, Ordering::Relaxed); let selected_idx = count % healthy_indices.len(); + let worker = workers[healthy_indices[selected_idx]].url(); + RouterMetrics::record_processed_request(worker); + RouterMetrics::record_policy_decision(self.name(), worker); Some(healthy_indices[selected_idx]) }