[router] router metrics cleanup (#8158)
This commit is contained in:
@@ -4,13 +4,13 @@
|
||||
use super::pd_types::{api_path, Bootstrap, ChatReqInput, GenerateReqInput, PDRouterError};
|
||||
use super::request_adapter::ToPdRequest;
|
||||
use crate::core::{HealthChecker, Worker, WorkerFactory, WorkerLoadGuard};
|
||||
use crate::metrics::RouterMetrics;
|
||||
use crate::openai_api_types::{ChatCompletionRequest, CompletionRequest, GenerateRequest};
|
||||
use crate::policies::LoadBalancingPolicy;
|
||||
use crate::tree::Tree;
|
||||
use actix_web::http::header::{HeaderValue, CONTENT_TYPE};
|
||||
use actix_web::{HttpRequest, HttpResponse};
|
||||
use futures_util::{StreamExt, TryStreamExt};
|
||||
use metrics::{counter, histogram};
|
||||
use serde_json::Value;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::{Arc, Mutex, RwLock};
|
||||
@@ -296,7 +296,7 @@ impl PDRouter {
|
||||
Ok(pair) => pair,
|
||||
Err(e) => {
|
||||
error!("Failed to select PD pair: {}", e);
|
||||
counter!("sgl_router_pd_errors_total", "error" => "server_selection").increment(1);
|
||||
RouterMetrics::record_pd_error("server_selection");
|
||||
return HttpResponse::ServiceUnavailable()
|
||||
.body(format!("No available servers: {}", e));
|
||||
}
|
||||
@@ -313,7 +313,7 @@ impl PDRouter {
|
||||
// Add bootstrap info using the trait method
|
||||
if let Err(e) = typed_req.add_bootstrap_info(prefill.as_ref()) {
|
||||
error!("Failed to add bootstrap info: {}", e);
|
||||
counter!("sgl_router_pd_errors_total", "error" => "bootstrap_injection").increment(1);
|
||||
RouterMetrics::record_pd_error("bootstrap_injection");
|
||||
return HttpResponse::InternalServerError()
|
||||
.body(format!("Bootstrap injection failed: {}", e));
|
||||
}
|
||||
@@ -374,7 +374,7 @@ impl PDRouter {
|
||||
Ok(pair) => pair,
|
||||
Err(e) => {
|
||||
error!("Failed to select PD pair: {}", e);
|
||||
counter!("sgl_router_pd_errors_total", "error" => "server_selection").increment(1);
|
||||
RouterMetrics::record_pd_error("server_selection");
|
||||
return HttpResponse::ServiceUnavailable()
|
||||
.body(format!("No available servers: {}", e));
|
||||
}
|
||||
@@ -391,7 +391,7 @@ impl PDRouter {
|
||||
// Add bootstrap info using the trait method
|
||||
if let Err(e) = typed_req.add_bootstrap_info(prefill.as_ref()) {
|
||||
error!("Failed to add bootstrap info: {}", e);
|
||||
counter!("sgl_router_pd_errors_total", "error" => "bootstrap_injection").increment(1);
|
||||
RouterMetrics::record_pd_error("bootstrap_injection");
|
||||
return HttpResponse::InternalServerError()
|
||||
.body(format!("Bootstrap injection failed: {}", e));
|
||||
}
|
||||
@@ -460,13 +460,10 @@ impl PDRouter {
|
||||
|
||||
// Update metrics
|
||||
let duration = start_time.elapsed();
|
||||
histogram!("sgl_router_pd_request_duration_seconds", "route" => route.to_string())
|
||||
.record(duration.as_secs_f64());
|
||||
counter!("sgl_router_pd_requests_total", "route" => route.to_string()).increment(1);
|
||||
counter!("sgl_router_pd_prefill_requests_total", "worker" => prefill.url().to_string())
|
||||
.increment(1);
|
||||
counter!("sgl_router_pd_decode_requests_total", "worker" => decode.url().to_string())
|
||||
.increment(1);
|
||||
RouterMetrics::record_pd_request_duration(route, duration);
|
||||
RouterMetrics::record_pd_request(route);
|
||||
RouterMetrics::record_pd_prefill_request(prefill.url());
|
||||
RouterMetrics::record_pd_decode_request(decode.url());
|
||||
|
||||
// Process decode response
|
||||
match decode_result {
|
||||
@@ -475,7 +472,7 @@ impl PDRouter {
|
||||
.unwrap_or(actix_web::http::StatusCode::INTERNAL_SERVER_ERROR);
|
||||
|
||||
if !status.is_success() {
|
||||
counter!("sgl_router_pd_decode_errors_total", "worker" => decode.url().to_string()).increment(1);
|
||||
RouterMetrics::record_pd_decode_error(decode.url());
|
||||
error!(
|
||||
"Decode server {} returned error status: {}",
|
||||
decode.url(),
|
||||
@@ -501,7 +498,7 @@ impl PDRouter {
|
||||
prefill.url(),
|
||||
e
|
||||
);
|
||||
counter!("sgl_router_pd_prefill_errors_total", "worker" => prefill.url().to_string()).increment(1);
|
||||
RouterMetrics::record_pd_prefill_error(prefill.url());
|
||||
}
|
||||
|
||||
if is_stream {
|
||||
@@ -548,13 +545,19 @@ impl PDRouter {
|
||||
} else {
|
||||
// No logprob merging needed
|
||||
HttpResponse::build(status)
|
||||
.insert_header((CONTENT_TYPE, HeaderValue::from_static("text/event-stream")))
|
||||
.insert_header((
|
||||
CONTENT_TYPE,
|
||||
HeaderValue::from_static("text/event-stream"),
|
||||
))
|
||||
.streaming({
|
||||
let decode_url = decode.url().to_string();
|
||||
res.bytes_stream().map_err(move |e| {
|
||||
error!("Stream error from decode server {}: {}", decode_url, e);
|
||||
counter!("sgl_router_pd_stream_errors_total", "worker" => decode_url.to_string()).increment(1);
|
||||
actix_web::error::ErrorInternalServerError(format!("Stream error: {}", e))
|
||||
RouterMetrics::record_pd_stream_error(&decode_url);
|
||||
actix_web::error::ErrorInternalServerError(format!(
|
||||
"Stream error: {}",
|
||||
e
|
||||
))
|
||||
})
|
||||
})
|
||||
}
|
||||
@@ -578,8 +581,7 @@ impl PDRouter {
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Decode request failed: {}", e);
|
||||
counter!("sgl_router_pd_decode_errors_total", "worker" => decode.url().to_string())
|
||||
.increment(1);
|
||||
RouterMetrics::record_pd_decode_error(decode.url());
|
||||
HttpResponse::BadGateway().body(format!("Decode server error: {}", e))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -151,13 +151,6 @@ impl GenerateReqInput {
|
||||
if texts.is_empty() {
|
||||
return Err("Batch text array is empty".to_string());
|
||||
}
|
||||
if texts.len() > 10000 {
|
||||
// Reasonable limit for production
|
||||
return Err(format!(
|
||||
"Batch size {} exceeds maximum allowed (10000)",
|
||||
texts.len()
|
||||
));
|
||||
}
|
||||
return Ok(Some(texts.len()));
|
||||
}
|
||||
|
||||
@@ -166,13 +159,6 @@ impl GenerateReqInput {
|
||||
if ids.is_empty() {
|
||||
return Err("Batch input_ids array is empty".to_string());
|
||||
}
|
||||
if ids.len() > 10000 {
|
||||
// Reasonable limit for production
|
||||
return Err(format!(
|
||||
"Batch size {} exceeds maximum allowed (10000)",
|
||||
ids.len()
|
||||
));
|
||||
}
|
||||
// Validate each sequence is not empty
|
||||
for (i, seq) in ids.iter().enumerate() {
|
||||
if seq.is_empty() {
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use crate::core::{HealthChecker, Worker, WorkerFactory};
|
||||
use crate::metrics::RouterMetrics;
|
||||
use crate::policies::LoadBalancingPolicy;
|
||||
use ::metrics::{counter, gauge, histogram};
|
||||
use actix_web::http::header::{HeaderValue, CONTENT_TYPE};
|
||||
use actix_web::{HttpRequest, HttpResponse};
|
||||
use futures_util::{StreamExt, TryStreamExt};
|
||||
@@ -43,7 +43,7 @@ impl Router {
|
||||
interval_secs: u64,
|
||||
) -> Result<Self, String> {
|
||||
// Update active workers gauge
|
||||
gauge!("sgl_router_active_workers").set(worker_urls.len() as f64);
|
||||
RouterMetrics::set_active_workers(worker_urls.len());
|
||||
|
||||
// Wait for workers to be healthy (skip if empty - for service discovery mode)
|
||||
if !worker_urls.is_empty() {
|
||||
@@ -215,13 +215,11 @@ impl Router {
|
||||
// Record request metrics
|
||||
if route != "/health" {
|
||||
let duration = start.elapsed();
|
||||
counter!("sgl_router_requests_total", "route" => route.to_string()).increment(1);
|
||||
histogram!("sgl_router_request_duration_seconds", "route" => route.to_string())
|
||||
.record(duration.as_secs_f64());
|
||||
RouterMetrics::record_request(route);
|
||||
RouterMetrics::record_request_duration(route, duration);
|
||||
|
||||
if !response.status().is_success() {
|
||||
counter!("sgl_router_request_errors_total", "route" => route.to_string())
|
||||
.increment(1);
|
||||
RouterMetrics::record_request_error(route, "request_failed");
|
||||
}
|
||||
}
|
||||
response
|
||||
@@ -390,7 +388,7 @@ impl Router {
|
||||
while request_retries < MAX_REQUEST_RETRIES {
|
||||
if total_retries >= 1 {
|
||||
info!("Retrying request after {} failed attempts", total_retries);
|
||||
counter!("sgl_router_retries_total", "route" => route.to_string()).increment(1);
|
||||
RouterMetrics::record_retry(route);
|
||||
}
|
||||
|
||||
// Increment load before request if using RAII load tracking
|
||||
@@ -398,8 +396,7 @@ impl Router {
|
||||
let workers_guard = self.workers.read().unwrap();
|
||||
if let Some(worker) = workers_guard.iter().find(|w| w.url() == &worker_url) {
|
||||
worker.increment_load();
|
||||
gauge!("sgl_router_running_requests", "worker" => worker_url.to_string())
|
||||
.set(worker.load() as f64);
|
||||
RouterMetrics::set_running_requests(&worker_url, worker.load());
|
||||
true
|
||||
} else {
|
||||
false
|
||||
@@ -423,16 +420,14 @@ impl Router {
|
||||
|
||||
if response.status().is_success() {
|
||||
let duration = start.elapsed();
|
||||
histogram!("sgl_router_generate_duration_seconds", "route" => route.to_string())
|
||||
.record(duration.as_secs_f64());
|
||||
RouterMetrics::record_generate_duration(duration);
|
||||
return response;
|
||||
} else {
|
||||
// if the worker is healthy, it means the request is bad, so return the error response
|
||||
let health_response =
|
||||
self.send_request(client, &worker_url, "/health", req).await;
|
||||
if health_response.status().is_success() {
|
||||
counter!("sgl_router_request_errors_total", "route" => route.to_string())
|
||||
.increment(1);
|
||||
RouterMetrics::record_request_error(route, "request_failed");
|
||||
return response;
|
||||
}
|
||||
}
|
||||
@@ -455,7 +450,7 @@ impl Router {
|
||||
}
|
||||
}
|
||||
|
||||
counter!("sgl_router_request_errors_total", "route" => route.to_string()).increment(1);
|
||||
RouterMetrics::record_request_error(route, "request_failed");
|
||||
HttpResponse::InternalServerError().body("All retry attempts failed")
|
||||
}
|
||||
|
||||
@@ -512,8 +507,7 @@ impl Router {
|
||||
if let Ok(workers_guard) = self.workers.read() {
|
||||
if let Some(worker) = workers_guard.iter().find(|w| w.url() == worker_url) {
|
||||
worker.decrement_load();
|
||||
gauge!("sgl_router_running_requests", "worker" => worker_url.to_string())
|
||||
.set(worker.load() as f64);
|
||||
RouterMetrics::set_running_requests(&worker_url, worker.load());
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -540,17 +534,15 @@ impl Router {
|
||||
if let Ok(workers_guard) = self.workers.read() {
|
||||
if let Some(worker) = workers_guard.iter().find(|w| w.url() == worker_url) {
|
||||
worker.decrement_load();
|
||||
gauge!("sgl_router_running_requests", "worker" => worker_url.to_string())
|
||||
.set(worker.load() as f64);
|
||||
RouterMetrics::set_running_requests(&worker_url, worker.load());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Record metrics
|
||||
let duration = start.elapsed();
|
||||
histogram!("sgl_router_generate_duration_seconds", "route" => route.to_string())
|
||||
.record(duration.as_secs_f64());
|
||||
counter!("sgl_router_requests_total", "route" => route.to_string()).increment(1);
|
||||
RouterMetrics::record_generate_duration(duration);
|
||||
RouterMetrics::record_request(route);
|
||||
|
||||
response
|
||||
} else if load_incremented {
|
||||
@@ -577,8 +569,10 @@ impl Router {
|
||||
workers_guard.iter().find(|w| w.url() == &worker_url)
|
||||
{
|
||||
worker.decrement_load();
|
||||
gauge!("sgl_router_running_requests", "worker" => worker_url.to_string())
|
||||
.set(worker.load() as f64);
|
||||
RouterMetrics::set_running_requests(
|
||||
&worker_url,
|
||||
worker.load(),
|
||||
);
|
||||
debug!("Streaming is done!!")
|
||||
}
|
||||
}
|
||||
@@ -626,7 +620,7 @@ impl Router {
|
||||
info!("Added worker: {}", worker_url);
|
||||
let new_worker = WorkerFactory::create_regular(worker_url.to_string());
|
||||
workers_guard.push(new_worker);
|
||||
gauge!("sgl_router_active_workers").set(workers_guard.len() as f64);
|
||||
RouterMetrics::set_active_workers(workers_guard.len());
|
||||
|
||||
// If cache aware policy, initialize the worker in the tree
|
||||
if let Some(cache_aware) =
|
||||
@@ -680,7 +674,7 @@ impl Router {
|
||||
if let Some(index) = workers_guard.iter().position(|w| w.url() == worker_url) {
|
||||
workers_guard.remove(index);
|
||||
info!("Removed worker: {}", worker_url);
|
||||
gauge!("sgl_router_active_workers").set(workers_guard.len() as f64);
|
||||
RouterMetrics::set_active_workers(workers_guard.len());
|
||||
} else {
|
||||
warn!("Worker {} not found, skipping removal", worker_url);
|
||||
return;
|
||||
|
||||
Reference in New Issue
Block a user