From 9d9fa9a537c054fa1d677834ec291024be328a5d Mon Sep 17 00:00:00 2001 From: LukasBluebaum <38468743+LukasBluebaum@users.noreply.github.com> Date: Tue, 2 Sep 2025 04:57:04 +0200 Subject: [PATCH] [router] Fix short timeout for the prefill client (#9803) --- .github/workflows/pr-test-pd-router.yml | 8 +-- sgl-router/src/routers/factory.rs | 1 + sgl-router/src/routers/http/pd_router.rs | 29 +++++----- sgl-router/src/routers/http/router.rs | 73 +++++++++++++++--------- 4 files changed, 66 insertions(+), 45 deletions(-) diff --git a/.github/workflows/pr-test-pd-router.yml b/.github/workflows/pr-test-pd-router.yml index 689231717..3d34a5d58 100644 --- a/.github/workflows/pr-test-pd-router.yml +++ b/.github/workflows/pr-test-pd-router.yml @@ -305,10 +305,10 @@ jobs: # Set mean thresholds (allowing for reasonable variance) # These can be adjusted based on your performance requirements - ttft_threshold=2.0 # Max 2.0 seconds for mean TTFT - e2e_latency_threshold=24.0 # Max 8.0 seconds for mean E2E latency - input_throughput_threshold=10000 # Min 9000 tokens/s for mean input throughput - output_throughput_threshold=90 # Min 100 tokens/s for mean output throughput + ttft_threshold=4.7 # Max 4.7 seconds for mean TTFT + e2e_latency_threshold=35.0 # Max 35.0 seconds for mean E2E latency + input_throughput_threshold=12000 # Min 12000 tokens/s for mean input throughput + output_throughput_threshold=68 # Min 68 tokens/s for mean output throughput # Validate mean thresholds diff --git a/sgl-router/src/routers/factory.rs b/sgl-router/src/routers/factory.rs index c0a4aa6d0..94845fdfb 100644 --- a/sgl-router/src/routers/factory.rs +++ b/sgl-router/src/routers/factory.rs @@ -95,6 +95,7 @@ impl RouterFactory { prefill_policy, decode_policy, ctx.client.clone(), + ctx.router_config.request_timeout_secs, ctx.router_config.worker_startup_timeout_secs, ctx.router_config.worker_startup_check_interval_secs, ctx.router_config.retry.clone(), diff --git a/sgl-router/src/routers/http/pd_router.rs b/sgl-router/src/routers/http/pd_router.rs index 887be65c4..beb40e45e 100644 --- a/sgl-router/src/routers/http/pd_router.rs +++ b/sgl-router/src/routers/http/pd_router.rs @@ -42,8 +42,8 @@ pub struct PDRouter { pub decode_workers: Arc>>>, pub prefill_policy: Arc, pub decode_policy: Arc, - pub timeout_secs: u64, - pub interval_secs: u64, + pub worker_startup_timeout_secs: u64, + pub worker_startup_check_interval_secs: u64, pub worker_loads: Arc>>, pub load_monitor_handle: Option>>, pub client: Client, @@ -74,8 +74,8 @@ impl PDRouter { async fn wait_for_server_health(&self, url: &str) -> Result<(), PDRouterError> { crate::routers::http::router::Router::wait_for_healthy_workers( &[url.to_string()], - self.timeout_secs, - self.interval_secs, + self.worker_startup_timeout_secs, + self.worker_startup_check_interval_secs, ) .await .map_err(|_| PDRouterError::HealthCheckFailed { @@ -376,8 +376,9 @@ impl PDRouter { prefill_policy: Arc, decode_policy: Arc, client: Client, - timeout_secs: u64, - interval_secs: u64, + prefill_request_timeout_secs: u64, + worker_startup_timeout_secs: u64, + worker_startup_check_interval_secs: u64, retry_config: RetryConfig, circuit_breaker_config: ConfigCircuitBreakerConfig, health_check_config: ConfigHealthCheckConfig, @@ -437,8 +438,8 @@ impl PDRouter { if !all_urls.is_empty() { crate::routers::http::router::Router::wait_for_healthy_workers( &all_urls, - timeout_secs, - interval_secs, + worker_startup_timeout_secs, + worker_startup_check_interval_secs, ) .await?; } @@ -465,7 +466,7 @@ impl PDRouter { let load_monitor_handle = if prefill_policy.name() == "power_of_two" || decode_policy.name() == "power_of_two" { let monitor_urls = all_urls.clone(); - let monitor_interval = interval_secs; + let monitor_interval = worker_startup_check_interval_secs; let monitor_client = client.clone(); let prefill_policy_clone = Arc::clone(&prefill_policy); let decode_policy_clone = Arc::clone(&decode_policy); @@ -503,7 +504,7 @@ impl PDRouter { .pool_max_idle_per_host(0) .http1_only() .connect_timeout(Duration::from_millis(300)) - .timeout(Duration::from_secs(2)) + .timeout(Duration::from_secs(prefill_request_timeout_secs)) .build() .map_err(|e| format!("Failed to build prefill client: {}", e))?; @@ -581,8 +582,8 @@ impl PDRouter { decode_workers, prefill_policy, decode_policy, - timeout_secs, - interval_secs, + worker_startup_timeout_secs, + worker_startup_check_interval_secs, worker_loads, load_monitor_handle, client, @@ -2104,8 +2105,8 @@ mod tests { decode_workers: Arc::new(RwLock::new(vec![])), prefill_policy, decode_policy, - timeout_secs: 5, - interval_secs: 1, + worker_startup_timeout_secs: 5, + worker_startup_check_interval_secs: 1, worker_loads: Arc::new(tokio::sync::watch::channel(HashMap::new()).1), load_monitor_handle: None, client: Client::new(), diff --git a/sgl-router/src/routers/http/router.rs b/sgl-router/src/routers/http/router.rs index 6e63c7f4a..963bef4aa 100644 --- a/sgl-router/src/routers/http/router.rs +++ b/sgl-router/src/routers/http/router.rs @@ -34,8 +34,8 @@ pub struct Router { workers: Arc>>>, policy: Arc, client: Client, - timeout_secs: u64, - interval_secs: u64, + worker_startup_timeout_secs: u64, + worker_startup_check_interval_secs: u64, dp_aware: bool, api_key: Option, retry_config: RetryConfig, @@ -52,8 +52,8 @@ impl Router { worker_urls: Vec, policy: Arc, client: Client, - timeout_secs: u64, - interval_secs: u64, + worker_startup_timeout_secs: u64, + worker_startup_check_interval_secs: u64, dp_aware: bool, api_key: Option, retry_config: RetryConfig, @@ -65,7 +65,12 @@ impl Router { // Wait for workers to be healthy (skip if empty - for service discovery mode) if !worker_urls.is_empty() { - Self::wait_for_healthy_workers(&worker_urls, timeout_secs, interval_secs).await?; + Self::wait_for_healthy_workers( + &worker_urls, + worker_startup_timeout_secs, + worker_startup_check_interval_secs, + ) + .await?; } let worker_urls = if dp_aware { @@ -110,7 +115,10 @@ impl Router { } let workers = Arc::new(RwLock::new(workers)); - let health_checker = crate::core::start_health_checker(Arc::clone(&workers), interval_secs); + let health_checker = crate::core::start_health_checker( + Arc::clone(&workers), + worker_startup_check_interval_secs, + ); // Setup load monitoring for PowerOfTwo policy let (tx, rx) = tokio::sync::watch::channel(HashMap::new()); @@ -118,7 +126,7 @@ impl Router { let load_monitor_handle = if policy.name() == "power_of_two" { let monitor_urls = worker_urls.clone(); - let monitor_interval = interval_secs; + let monitor_interval = worker_startup_check_interval_secs; let policy_clone = Arc::clone(&policy); let client_clone = client.clone(); @@ -140,8 +148,8 @@ impl Router { workers, policy, client, - timeout_secs, - interval_secs, + worker_startup_timeout_secs, + worker_startup_check_interval_secs, dp_aware, api_key, retry_config, @@ -164,8 +172,8 @@ impl Router { pub async fn wait_for_healthy_workers( worker_urls: &[String], - timeout_secs: u64, - interval_secs: u64, + worker_startup_timeout_secs: u64, + worker_startup_check_interval_secs: u64, ) -> Result<(), String> { if worker_urls.is_empty() { return Err( @@ -174,18 +182,23 @@ impl Router { } // Perform health check asynchronously - Self::wait_for_healthy_workers_async(worker_urls, timeout_secs, interval_secs).await + Self::wait_for_healthy_workers_async( + worker_urls, + worker_startup_timeout_secs, + worker_startup_check_interval_secs, + ) + .await } async fn wait_for_healthy_workers_async( worker_urls: &[String], - timeout_secs: u64, - interval_secs: u64, + worker_startup_timeout_secs: u64, + worker_startup_check_interval_secs: u64, ) -> Result<(), String> { info!( "Waiting for {} workers to become healthy (timeout: {}s)", worker_urls.len(), - timeout_secs + worker_startup_timeout_secs ); let start_time = std::time::Instant::now(); @@ -195,14 +208,14 @@ impl Router { .map_err(|e| format!("Failed to create HTTP client: {}", e))?; loop { - if start_time.elapsed() > Duration::from_secs(timeout_secs) { + if start_time.elapsed() > Duration::from_secs(worker_startup_timeout_secs) { error!( "Timeout {}s waiting for workers {:?} to become healthy. Please set --router-worker-startup-timeout-secs (sglang_router.launch_server) or --worker-startup-timeout-secs (sglang_worker.router) to a larger value", - timeout_secs, worker_urls + worker_startup_timeout_secs, worker_urls ); return Err(format!( "Timeout {}s waiting for workers {:?} to become healthy. Please set --router-worker-startup-timeout-secs (sglang_router.launch_server) or --worker-startup-timeout-secs (sglang_worker.router) to a larger value", - timeout_secs, worker_urls + worker_startup_timeout_secs, worker_urls )); } @@ -262,7 +275,7 @@ impl Router { unhealthy_workers.len(), unhealthy_workers ); - tokio::time::sleep(Duration::from_secs(interval_secs)).await; + tokio::time::sleep(Duration::from_secs(worker_startup_check_interval_secs)).await; } } } @@ -812,19 +825,19 @@ impl Router { pub async fn add_worker(&self, worker_url: &str) -> Result { let start_time = std::time::Instant::now(); let client = reqwest::Client::builder() - .timeout(Duration::from_secs(self.timeout_secs)) + .timeout(Duration::from_secs(self.worker_startup_timeout_secs)) .build() .map_err(|e| format!("Failed to create HTTP client: {}", e))?; loop { - if start_time.elapsed() > Duration::from_secs(self.timeout_secs) { + if start_time.elapsed() > Duration::from_secs(self.worker_startup_timeout_secs) { error!( "Timeout {}s waiting for worker {} to become healthy. Please set --router-worker-startup-timeout-secs (sglang_router.launch_server) or --worker-startup-timeout-secs (sglang_worker.router) to a larger value", - self.timeout_secs, worker_url + self.worker_startup_timeout_secs, worker_url ); return Err(format!( "Timeout {}s waiting for worker {} to become healthy. Please set --router-worker-startup-timeout-secs (sglang_router.launch_server) or --worker-startup-timeout-secs (sglang_worker.router) to a larger value", - self.timeout_secs, worker_url + self.worker_startup_timeout_secs, worker_url )); } @@ -894,7 +907,10 @@ impl Router { warn!("The worker url {} does not have http or https prefix. Please add the prefix to the url.", worker_url); } - tokio::time::sleep(Duration::from_secs(self.interval_secs)).await; + tokio::time::sleep(Duration::from_secs( + self.worker_startup_check_interval_secs, + )) + .await; continue; } } @@ -906,7 +922,10 @@ impl Router { warn!("The worker url {} does not have http or https prefix. Please add the prefix to the url.", worker_url); } - tokio::time::sleep(Duration::from_secs(self.interval_secs)).await; + tokio::time::sleep(Duration::from_secs( + self.worker_startup_check_interval_secs, + )) + .await; continue; } } @@ -1324,8 +1343,8 @@ mod tests { Router { workers: Arc::new(RwLock::new(workers)), policy: Arc::new(RandomPolicy::new()), - timeout_secs: 5, - interval_secs: 1, + worker_startup_timeout_secs: 5, + worker_startup_check_interval_secs: 1, dp_aware: false, api_key: None, client: Client::new(),