From 21b884606676ac30e3dc0d52ee38e2e5409445ca Mon Sep 17 00:00:00 2001 From: Simo Lin Date: Fri, 15 Aug 2025 08:07:45 -0700 Subject: [PATCH] [router] allow more health check configuration (#9198) --- .../py_src/sglang_router/launch_router.py | 103 +++++++++++++---- sgl-router/py_src/sglang_router/router.py | 53 +++++---- sgl-router/src/config/types.rs | 76 +++++++++---- sgl-router/src/core/mod.rs | 4 +- sgl-router/src/core/worker.rs | 106 +++++++++--------- sgl-router/src/lib.rs | 67 +++++++---- sgl-router/src/main.rs | 70 ++++++++---- sgl-router/src/routers/factory.rs | 2 + sgl-router/src/routers/pd_router.rs | 53 +++++++-- sgl-router/src/routers/router.rs | 22 +++- sgl-router/src/service_discovery.rs | 1 + sgl-router/tests/api_endpoints_test.rs | 4 + sgl-router/tests/request_formats_test.rs | 1 + sgl-router/tests/streaming_tests.rs | 1 + sgl-router/tests/test_pd_routing.rs | 1 + 15 files changed, 398 insertions(+), 166 deletions(-) diff --git a/sgl-router/py_src/sglang_router/launch_router.py b/sgl-router/py_src/sglang_router/launch_router.py index 04dc53d77..4adf9eb71 100644 --- a/sgl-router/py_src/sglang_router/launch_router.py +++ b/sgl-router/py_src/sglang_router/launch_router.py @@ -42,14 +42,14 @@ class RouterArgs: policy: str = "cache_aware" prefill_policy: Optional[str] = None # Specific policy for prefill nodes in PD mode decode_policy: Optional[str] = None # Specific policy for decode nodes in PD mode - worker_startup_timeout_secs: int = 300 - worker_startup_check_interval: int = 10 - cache_threshold: float = 0.5 - balance_abs_threshold: int = 32 - balance_rel_threshold: float = 1.0001 - eviction_interval: int = 60 - max_tree_size: int = 2**24 - max_payload_size: int = 256 * 1024 * 1024 # 256MB default for large batches + worker_startup_timeout_secs: int = 600 + worker_startup_check_interval: int = 30 + cache_threshold: float = 0.3 + balance_abs_threshold: int = 64 + balance_rel_threshold: float = 1.5 + eviction_interval: int = 120 + max_tree_size: int = 2**26 + max_payload_size: int = 512 * 1024 * 1024 # 512MB default for large batches dp_aware: bool = False api_key: Optional[str] = None log_dir: Optional[str] = None @@ -69,23 +69,29 @@ class RouterArgs: # Request ID headers configuration request_id_headers: Optional[List[str]] = None # Request timeout in seconds - request_timeout_secs: int = 600 + request_timeout_secs: int = 1800 # Max concurrent requests for rate limiting - max_concurrent_requests: int = 64 + max_concurrent_requests: int = 256 # CORS allowed origins cors_allowed_origins: List[str] = dataclasses.field(default_factory=list) # Retry configuration - retry_max_retries: int = 3 - retry_initial_backoff_ms: int = 100 - retry_max_backoff_ms: int = 10_000 - retry_backoff_multiplier: float = 2.0 - retry_jitter_factor: float = 0.1 + retry_max_retries: int = 5 + retry_initial_backoff_ms: int = 50 + retry_max_backoff_ms: int = 30_000 + retry_backoff_multiplier: float = 1.5 + retry_jitter_factor: float = 0.2 disable_retries: bool = False + # Health check configuration + health_failure_threshold: int = 3 + health_success_threshold: int = 2 + health_check_timeout_secs: int = 5 + health_check_interval_secs: int = 60 + health_check_endpoint: str = "/health" # Circuit breaker configuration - cb_failure_threshold: int = 5 - cb_success_threshold: int = 2 - cb_timeout_duration_secs: int = 30 - cb_window_duration_secs: int = 60 + cb_failure_threshold: int = 10 + cb_success_threshold: int = 3 + cb_timeout_duration_secs: int = 60 + cb_window_duration_secs: int = 120 disable_circuit_breaker: bool = False @staticmethod @@ -359,6 +365,37 @@ class RouterArgs: action="store_true", help="Disable circuit breaker (equivalent to setting cb_failure_threshold to u32::MAX)", ) + # Health check configuration + parser.add_argument( + f"--{prefix}health-failure-threshold", + type=int, + default=RouterArgs.health_failure_threshold, + help="Number of consecutive health check failures before marking worker unhealthy", + ) + parser.add_argument( + f"--{prefix}health-success-threshold", + type=int, + default=RouterArgs.health_success_threshold, + help="Number of consecutive health check successes before marking worker healthy", + ) + parser.add_argument( + f"--{prefix}health-check-timeout-secs", + type=int, + default=RouterArgs.health_check_timeout_secs, + help="Timeout in seconds for health check requests", + ) + parser.add_argument( + f"--{prefix}health-check-interval-secs", + type=int, + default=RouterArgs.health_check_interval_secs, + help="Interval in seconds between runtime health checks", + ) + parser.add_argument( + f"--{prefix}health-check-endpoint", + type=str, + default=RouterArgs.health_check_endpoint, + help="Health check endpoint path", + ) parser.add_argument( f"--{prefix}max-concurrent-requests", type=int, @@ -455,6 +492,29 @@ class RouterArgs: disable_circuit_breaker=getattr( args, f"{prefix}disable_circuit_breaker", False ), + health_failure_threshold=getattr( + args, + f"{prefix}health_failure_threshold", + RouterArgs.health_failure_threshold, + ), + health_success_threshold=getattr( + args, + f"{prefix}health_success_threshold", + RouterArgs.health_success_threshold, + ), + health_check_timeout_secs=getattr( + args, + f"{prefix}health_check_timeout_secs", + RouterArgs.health_check_timeout_secs, + ), + health_check_interval_secs=getattr( + args, + f"{prefix}health_check_interval_secs", + RouterArgs.health_check_interval_secs, + ), + health_check_endpoint=getattr( + args, f"{prefix}health_check_endpoint", RouterArgs.health_check_endpoint + ), ) @staticmethod @@ -652,6 +712,11 @@ def launch_router(args: argparse.Namespace) -> Optional[Router]: cb_window_duration_secs=router_args.cb_window_duration_secs, disable_retries=router_args.disable_retries, disable_circuit_breaker=router_args.disable_circuit_breaker, + health_failure_threshold=router_args.health_failure_threshold, + health_success_threshold=router_args.health_success_threshold, + health_check_timeout_secs=router_args.health_check_timeout_secs, + health_check_interval_secs=router_args.health_check_interval_secs, + health_check_endpoint=router_args.health_check_endpoint, ) router.start() diff --git a/sgl-router/py_src/sglang_router/router.py b/sgl-router/py_src/sglang_router/router.py index 9530641a2..9abed9d96 100644 --- a/sgl-router/py_src/sglang_router/router.py +++ b/sgl-router/py_src/sglang_router/router.py @@ -66,6 +66,11 @@ class Router: request_timeout_secs: Request timeout in seconds. Default: 600 max_concurrent_requests: Maximum number of concurrent requests allowed for rate limiting. Default: 64 cors_allowed_origins: List of allowed origins for CORS. Empty list allows all origins. Default: [] + health_failure_threshold: Number of consecutive health check failures before marking worker unhealthy. Default: 3 + health_success_threshold: Number of consecutive health check successes before marking worker healthy. Default: 2 + health_check_timeout_secs: Timeout in seconds for health check requests. Default: 5 + health_check_interval_secs: Interval in seconds between runtime health checks. Default: 60 + health_check_endpoint: Health check endpoint path. Default: '/health' """ def __init__( @@ -74,14 +79,14 @@ class Router: policy: PolicyType = PolicyType.RoundRobin, host: str = "127.0.0.1", port: int = 3001, - worker_startup_timeout_secs: int = 300, - worker_startup_check_interval: int = 10, - cache_threshold: float = 0.50, - balance_abs_threshold: int = 32, - balance_rel_threshold: float = 1.0001, - eviction_interval_secs: int = 60, - max_tree_size: int = 2**24, - max_payload_size: int = 256 * 1024 * 1024, # 256MB + worker_startup_timeout_secs: int = 600, + worker_startup_check_interval: int = 30, + cache_threshold: float = 0.3, + balance_abs_threshold: int = 64, + balance_rel_threshold: float = 1.5, + eviction_interval_secs: int = 120, + max_tree_size: int = 2**26, + max_payload_size: int = 512 * 1024 * 1024, # 512MB dp_aware: bool = False, api_key: Optional[str] = None, log_dir: Optional[str] = None, @@ -95,26 +100,31 @@ class Router: bootstrap_port_annotation: str = "sglang.ai/bootstrap-port", prometheus_port: Optional[int] = None, prometheus_host: Optional[str] = None, - request_timeout_secs: int = 600, + request_timeout_secs: int = 1800, request_id_headers: Optional[List[str]] = None, pd_disaggregation: bool = False, prefill_urls: Optional[List[tuple]] = None, decode_urls: Optional[List[str]] = None, prefill_policy: Optional[PolicyType] = None, decode_policy: Optional[PolicyType] = None, - max_concurrent_requests: int = 64, + max_concurrent_requests: int = 256, cors_allowed_origins: List[str] = None, - retry_max_retries: int = 3, - retry_initial_backoff_ms: int = 100, - retry_max_backoff_ms: int = 10_000, - retry_backoff_multiplier: float = 2.0, - retry_jitter_factor: float = 0.1, - cb_failure_threshold: int = 5, - cb_success_threshold: int = 2, - cb_timeout_duration_secs: int = 30, - cb_window_duration_secs: int = 60, + retry_max_retries: int = 5, + retry_initial_backoff_ms: int = 50, + retry_max_backoff_ms: int = 30_000, + retry_backoff_multiplier: float = 1.5, + retry_jitter_factor: float = 0.2, + cb_failure_threshold: int = 10, + cb_success_threshold: int = 3, + cb_timeout_duration_secs: int = 60, + cb_window_duration_secs: int = 120, disable_retries: bool = False, disable_circuit_breaker: bool = False, + health_failure_threshold: int = 3, + health_success_threshold: int = 2, + health_check_timeout_secs: int = 5, + health_check_interval_secs: int = 60, + health_check_endpoint: str = "/health", ): if selector is None: selector = {} @@ -171,6 +181,11 @@ class Router: cb_window_duration_secs=cb_window_duration_secs, disable_retries=disable_retries, disable_circuit_breaker=disable_circuit_breaker, + health_failure_threshold=health_failure_threshold, + health_success_threshold=health_success_threshold, + health_check_timeout_secs=health_check_timeout_secs, + health_check_interval_secs=health_check_interval_secs, + health_check_endpoint=health_check_endpoint, ) def start(self) -> None: diff --git a/sgl-router/src/config/types.rs b/sgl-router/src/config/types.rs index 60452440b..4576f9033 100644 --- a/sgl-router/src/config/types.rs +++ b/sgl-router/src/config/types.rs @@ -49,6 +49,8 @@ pub struct RouterConfig { /// Disable circuit breaker (overrides circuit_breaker.failure_threshold to u32::MAX when true) #[serde(default)] pub disable_circuit_breaker: bool, + /// Health check configuration + pub health_check: HealthCheckConfig, } /// Routing mode configuration @@ -183,7 +185,7 @@ impl Default for DiscoveryConfig { enabled: false, namespace: None, port: 8000, - check_interval_secs: 60, + check_interval_secs: 120, selector: HashMap::new(), prefill_selector: HashMap::new(), decode_selector: HashMap::new(), @@ -212,17 +214,44 @@ pub struct RetryConfig { impl Default for RetryConfig { fn default() -> Self { Self { - max_retries: 3, - initial_backoff_ms: 100, - max_backoff_ms: 10000, - backoff_multiplier: 2.0, - jitter_factor: 0.1, + max_retries: 5, + initial_backoff_ms: 50, + max_backoff_ms: 30000, + backoff_multiplier: 1.5, + jitter_factor: 0.2, } } } fn default_retry_jitter_factor() -> f32 { - 0.1 + 0.2 +} + +/// Health check configuration for worker monitoring +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct HealthCheckConfig { + /// Number of consecutive failures before marking unhealthy + pub failure_threshold: u32, + /// Number of consecutive successes before marking healthy + pub success_threshold: u32, + /// Timeout for health check requests in seconds + pub timeout_secs: u64, + /// Interval between health checks in seconds + pub check_interval_secs: u64, + /// Health check endpoint path + pub endpoint: String, +} + +impl Default for HealthCheckConfig { + fn default() -> Self { + Self { + failure_threshold: 3, + success_threshold: 2, + timeout_secs: 5, + check_interval_secs: 60, + endpoint: "/health".to_string(), + } + } } /// Circuit breaker configuration for worker reliability @@ -241,10 +270,10 @@ pub struct CircuitBreakerConfig { impl Default for CircuitBreakerConfig { fn default() -> Self { Self { - failure_threshold: 5, - success_threshold: 2, - timeout_duration_secs: 30, - window_duration_secs: 60, + failure_threshold: 10, + success_threshold: 3, + timeout_duration_secs: 60, + window_duration_secs: 120, } } } @@ -276,10 +305,10 @@ impl Default for RouterConfig { policy: PolicyConfig::Random, host: "127.0.0.1".to_string(), port: 3001, - max_payload_size: 268_435_456, // 256MB - request_timeout_secs: 3600, // 1 hour to match Python mini LB - worker_startup_timeout_secs: 300, - worker_startup_check_interval_secs: 10, + max_payload_size: 536_870_912, // 512MB + request_timeout_secs: 1800, // 30 minutes + worker_startup_timeout_secs: 600, + worker_startup_check_interval_secs: 30, dp_aware: false, api_key: None, discovery: None, @@ -287,12 +316,13 @@ impl Default for RouterConfig { log_dir: None, log_level: None, request_id_headers: None, - max_concurrent_requests: 64, + max_concurrent_requests: 256, cors_allowed_origins: vec![], retry: RetryConfig::default(), circuit_breaker: CircuitBreakerConfig::default(), disable_retries: false, disable_circuit_breaker: false, + health_check: HealthCheckConfig::default(), } } } @@ -365,10 +395,10 @@ mod tests { assert!(matches!(config.policy, PolicyConfig::Random)); assert_eq!(config.host, "127.0.0.1"); assert_eq!(config.port, 3001); - assert_eq!(config.max_payload_size, 268_435_456); - assert_eq!(config.request_timeout_secs, 3600); - assert_eq!(config.worker_startup_timeout_secs, 300); - assert_eq!(config.worker_startup_check_interval_secs, 10); + assert_eq!(config.max_payload_size, 536_870_912); + assert_eq!(config.request_timeout_secs, 1800); + assert_eq!(config.worker_startup_timeout_secs, 600); + assert_eq!(config.worker_startup_check_interval_secs, 30); assert!(config.discovery.is_none()); assert!(config.metrics.is_none()); assert!(config.log_dir.is_none()); @@ -425,6 +455,7 @@ mod tests { circuit_breaker: CircuitBreakerConfig::default(), disable_retries: false, disable_circuit_breaker: false, + health_check: HealthCheckConfig::default(), }; let json = serde_json::to_string(&config).unwrap(); @@ -614,7 +645,7 @@ mod tests { assert!(!config.enabled); assert!(config.namespace.is_none()); assert_eq!(config.port, 8000); - assert_eq!(config.check_interval_secs, 60); + assert_eq!(config.check_interval_secs, 120); assert!(config.selector.is_empty()); assert!(config.prefill_selector.is_empty()); assert!(config.decode_selector.is_empty()); @@ -856,6 +887,7 @@ mod tests { circuit_breaker: CircuitBreakerConfig::default(), disable_retries: false, disable_circuit_breaker: false, + health_check: HealthCheckConfig::default(), }; assert!(config.mode.is_pd_mode()); @@ -911,6 +943,7 @@ mod tests { circuit_breaker: CircuitBreakerConfig::default(), disable_retries: false, disable_circuit_breaker: false, + health_check: HealthCheckConfig::default(), }; assert!(!config.mode.is_pd_mode()); @@ -962,6 +995,7 @@ mod tests { circuit_breaker: CircuitBreakerConfig::default(), disable_retries: false, disable_circuit_breaker: false, + health_check: HealthCheckConfig::default(), }; assert!(config.has_service_discovery()); diff --git a/sgl-router/src/core/mod.rs b/sgl-router/src/core/mod.rs index 727cf3515..101578119 100644 --- a/sgl-router/src/core/mod.rs +++ b/sgl-router/src/core/mod.rs @@ -18,6 +18,6 @@ pub use circuit_breaker::{ pub use error::{WorkerError, WorkerResult}; pub use retry::{is_retryable_status, BackoffCalculator, RetryError, RetryExecutor}; pub use worker::{ - start_health_checker, BasicWorker, DPAwareWorker, HealthChecker, Worker, WorkerCollection, - WorkerFactory, WorkerLoadGuard, WorkerType, + start_health_checker, BasicWorker, DPAwareWorker, HealthChecker, HealthConfig, Worker, + WorkerCollection, WorkerFactory, WorkerLoadGuard, WorkerType, }; diff --git a/sgl-router/src/core/worker.rs b/sgl-router/src/core/worker.rs index bfe36e321..49cba8ccf 100644 --- a/sgl-router/src/core/worker.rs +++ b/sgl-router/src/core/worker.rs @@ -182,6 +182,10 @@ pub struct HealthConfig { pub check_interval_secs: u64, /// Health check endpoint path pub endpoint: String, + /// Number of consecutive failures before marking unhealthy + pub failure_threshold: u32, + /// Number of consecutive successes before marking healthy + pub success_threshold: u32, } impl Default for HealthConfig { @@ -190,6 +194,8 @@ impl Default for HealthConfig { timeout_secs: 5, check_interval_secs: 30, endpoint: "/health".to_string(), + failure_threshold: 3, + success_threshold: 2, } } } @@ -214,6 +220,8 @@ pub struct BasicWorker { load_counter: Arc, processed_counter: Arc, healthy: Arc, + consecutive_failures: Arc, + consecutive_successes: Arc, circuit_breaker: CircuitBreaker, } @@ -231,6 +239,8 @@ impl BasicWorker { load_counter: Arc::new(AtomicUsize::new(0)), processed_counter: Arc::new(AtomicUsize::new(0)), healthy: Arc::new(AtomicBool::new(true)), + consecutive_failures: Arc::new(AtomicUsize::new(0)), + consecutive_successes: Arc::new(AtomicUsize::new(0)), circuit_breaker: CircuitBreaker::new(), } } @@ -300,26 +310,47 @@ impl Worker for BasicWorker { let timeout = Duration::from_secs(self.metadata.health_config.timeout_secs); // Use the shared client with a custom timeout for this request - match WORKER_CLIENT.get(&health_url).timeout(timeout).send().await { + let health_result = match WORKER_CLIENT.get(&health_url).timeout(timeout).send().await { Ok(response) => { if response.status().is_success() { - self.set_healthy(true); - Ok(()) + true } else { - self.set_healthy(false); - Err(WorkerError::HealthCheckFailed { - url: url.to_string(), - reason: format!("Health check returned status: {}", response.status()), - }) + false } } - Err(e) => { - self.set_healthy(false); - Err(WorkerError::HealthCheckFailed { - url: url.to_string(), - reason: format!("Health check request failed: {}", e), - }) + Err(_) => false, + }; + + if health_result { + // Health check succeeded + self.consecutive_failures.store(0, Ordering::Release); + let successes = self.consecutive_successes.fetch_add(1, Ordering::AcqRel) + 1; + + // Mark healthy if we've reached the success threshold + if !self.is_healthy() + && successes >= self.metadata.health_config.success_threshold as usize + { + self.set_healthy(true); + self.consecutive_successes.store(0, Ordering::Release); } + Ok(()) + } else { + // Health check failed + self.consecutive_successes.store(0, Ordering::Release); + let failures = self.consecutive_failures.fetch_add(1, Ordering::AcqRel) + 1; + + // Mark unhealthy if we've reached the failure threshold + if self.is_healthy() + && failures >= self.metadata.health_config.failure_threshold as usize + { + self.set_healthy(false); + self.consecutive_failures.store(0, Ordering::Release); + } + + Err(WorkerError::HealthCheckFailed { + url: url.to_string(), + reason: format!("Health check failed (consecutive failures: {})", failures), + }) } } @@ -408,43 +439,8 @@ impl Worker for DPAwareWorker { } async fn check_health_async(&self) -> WorkerResult<()> { - // Use base URL for health checks - let health_url = format!("{}/health", self.base_url); - let timeout = - std::time::Duration::from_secs(self.base_worker.metadata.health_config.timeout_secs); - - let health_result = async { - let response = WORKER_CLIENT - .get(&health_url) - .timeout(timeout) - .send() - .await - .map_err(|e| format!("Health check request failed: {}", e))?; - - if response.status().is_success() { - Ok(()) - } else { - Err(format!( - "Health check returned status: {}", - response.status() - )) - } - } - .await; - - match health_result { - Ok(()) => { - self.set_healthy(true); - Ok(()) - } - Err(reason) => { - self.set_healthy(false); - Err(WorkerError::HealthCheckFailed { - url: self.base_url.clone(), - reason, - }) - } - } + // Delegate to the base worker's health check logic + self.base_worker.check_health_async().await } fn load(&self) -> usize { @@ -951,6 +947,8 @@ mod tests { assert_eq!(config.timeout_secs, 5); assert_eq!(config.check_interval_secs, 30); assert_eq!(config.endpoint, "/health"); + assert_eq!(config.failure_threshold, 3); + assert_eq!(config.success_threshold, 2); } #[test] @@ -959,10 +957,14 @@ mod tests { timeout_secs: 10, check_interval_secs: 60, endpoint: "/healthz".to_string(), + failure_threshold: 5, + success_threshold: 3, }; assert_eq!(config.timeout_secs, 10); assert_eq!(config.check_interval_secs, 60); assert_eq!(config.endpoint, "/healthz"); + assert_eq!(config.failure_threshold, 5); + assert_eq!(config.success_threshold, 3); } // Test BasicWorker @@ -994,6 +996,8 @@ mod tests { timeout_secs: 15, check_interval_secs: 45, endpoint: "/custom-health".to_string(), + failure_threshold: 4, + success_threshold: 2, }; let worker = BasicWorker::new("http://test:8080".to_string(), WorkerType::Regular) diff --git a/sgl-router/src/lib.rs b/sgl-router/src/lib.rs index 12d5c1a47..02b626f52 100644 --- a/sgl-router/src/lib.rs +++ b/sgl-router/src/lib.rs @@ -72,6 +72,12 @@ struct Router { cb_timeout_duration_secs: u64, cb_window_duration_secs: u64, disable_circuit_breaker: bool, + // Health check configuration + health_failure_threshold: u32, + health_success_threshold: u32, + health_check_timeout_secs: u64, + health_check_interval_secs: u64, + health_check_endpoint: String, } impl Router { @@ -174,6 +180,13 @@ impl Router { }, disable_retries: false, disable_circuit_breaker: false, + health_check: config::HealthCheckConfig { + failure_threshold: self.health_failure_threshold, + success_threshold: self.health_success_threshold, + timeout_secs: self.health_check_timeout_secs, + check_interval_secs: self.health_check_interval_secs, + endpoint: self.health_check_endpoint.clone(), + }, }) } } @@ -186,14 +199,14 @@ impl Router { policy = PolicyType::RoundRobin, host = String::from("127.0.0.1"), port = 3001, - worker_startup_timeout_secs = 300, - worker_startup_check_interval = 10, - cache_threshold = 0.50, - balance_abs_threshold = 32, - balance_rel_threshold = 1.0001, - eviction_interval_secs = 60, - max_tree_size = 2usize.pow(24), - max_payload_size = 256 * 1024 * 1024, // 256MB default for large batches + worker_startup_timeout_secs = 600, + worker_startup_check_interval = 30, + cache_threshold = 0.3, + balance_abs_threshold = 64, + balance_rel_threshold = 1.5, + eviction_interval_secs = 120, + max_tree_size = 2usize.pow(26), + max_payload_size = 512 * 1024 * 1024, // 512MB default for large batches dp_aware = false, api_key = None, log_dir = None, @@ -207,28 +220,34 @@ impl Router { bootstrap_port_annotation = String::from("sglang.ai/bootstrap-port"), prometheus_port = None, prometheus_host = None, - request_timeout_secs = 600, // Add configurable request timeout + request_timeout_secs = 1800, // Add configurable request timeout request_id_headers = None, // Custom request ID headers pd_disaggregation = false, // New flag for PD mode prefill_urls = None, decode_urls = None, prefill_policy = None, decode_policy = None, - max_concurrent_requests = 64, + max_concurrent_requests = 256, cors_allowed_origins = vec![], // Retry defaults - retry_max_retries = 3, - retry_initial_backoff_ms = 100, - retry_max_backoff_ms = 10_000, - retry_backoff_multiplier = 2.0, - retry_jitter_factor = 0.1, + retry_max_retries = 5, + retry_initial_backoff_ms = 50, + retry_max_backoff_ms = 30_000, + retry_backoff_multiplier = 1.5, + retry_jitter_factor = 0.2, disable_retries = false, // Circuit breaker defaults - cb_failure_threshold = 5, - cb_success_threshold = 2, - cb_timeout_duration_secs = 30, - cb_window_duration_secs = 60, + cb_failure_threshold = 10, + cb_success_threshold = 3, + cb_timeout_duration_secs = 60, + cb_window_duration_secs = 120, disable_circuit_breaker = false, + // Health check defaults + health_failure_threshold = 3, + health_success_threshold = 2, + health_check_timeout_secs = 5, + health_check_interval_secs = 60, + health_check_endpoint = String::from("/health"), ))] fn new( worker_urls: Vec, @@ -276,6 +295,11 @@ impl Router { cb_timeout_duration_secs: u64, cb_window_duration_secs: u64, disable_circuit_breaker: bool, + health_failure_threshold: u32, + health_success_threshold: u32, + health_check_timeout_secs: u64, + health_check_interval_secs: u64, + health_check_endpoint: String, ) -> PyResult { Ok(Router { host, @@ -323,6 +347,11 @@ impl Router { cb_timeout_duration_secs, cb_window_duration_secs, disable_circuit_breaker, + health_failure_threshold, + health_success_threshold, + health_check_timeout_secs, + health_check_interval_secs, + health_check_endpoint, }) } diff --git a/sgl-router/src/main.rs b/sgl-router/src/main.rs index 180545942..f1af4f08b 100644 --- a/sgl-router/src/main.rs +++ b/sgl-router/src/main.rs @@ -1,7 +1,7 @@ use clap::{ArgAction, Parser}; use sglang_router_rs::config::{ - CircuitBreakerConfig, ConfigError, ConfigResult, DiscoveryConfig, MetricsConfig, PolicyConfig, - RetryConfig, RouterConfig, RoutingMode, + CircuitBreakerConfig, ConfigError, ConfigResult, DiscoveryConfig, HealthCheckConfig, + MetricsConfig, PolicyConfig, RetryConfig, RouterConfig, RoutingMode, }; use sglang_router_rs::metrics::PrometheusConfig; use sglang_router_rs::server::{self, ServerConfig}; @@ -105,35 +105,35 @@ struct CliArgs { decode_policy: Option, /// Timeout in seconds for worker startup - #[arg(long, default_value_t = 300)] + #[arg(long, default_value_t = 600)] worker_startup_timeout_secs: u64, /// Interval in seconds between checks for worker startup - #[arg(long, default_value_t = 10)] + #[arg(long, default_value_t = 30)] worker_startup_check_interval: u64, /// Cache threshold (0.0-1.0) for cache-aware routing - #[arg(long, default_value_t = 0.5)] + #[arg(long, default_value_t = 0.3)] cache_threshold: f32, /// Absolute threshold for load balancing - #[arg(long, default_value_t = 32)] + #[arg(long, default_value_t = 64)] balance_abs_threshold: usize, /// Relative threshold for load balancing - #[arg(long, default_value_t = 1.0001)] + #[arg(long, default_value_t = 1.5)] balance_rel_threshold: f32, /// Interval in seconds between cache eviction operations - #[arg(long, default_value_t = 60)] + #[arg(long, default_value_t = 120)] eviction_interval: u64, /// Maximum size of the approximation tree for cache-aware routing - #[arg(long, default_value_t = 16777216)] // 2^24 + #[arg(long, default_value_t = 67108864)] // 2^26 max_tree_size: usize, /// Maximum payload size in bytes - #[arg(long, default_value_t = 268435456)] // 256MB + #[arg(long, default_value_t = 536870912)] // 512MB max_payload_size: usize, /// Enable data parallelism aware schedule @@ -189,11 +189,11 @@ struct CliArgs { request_id_headers: Vec, /// Request timeout in seconds - #[arg(long, default_value_t = 600)] + #[arg(long, default_value_t = 1800)] request_timeout_secs: u64, /// Maximum number of concurrent requests allowed - #[arg(long, default_value_t = 64)] + #[arg(long, default_value_t = 256)] max_concurrent_requests: usize, /// CORS allowed origins @@ -202,23 +202,23 @@ struct CliArgs { // Retry configuration /// Maximum number of retries - #[arg(long, default_value_t = 3)] + #[arg(long, default_value_t = 5)] retry_max_retries: u32, /// Initial backoff in milliseconds for retries - #[arg(long, default_value_t = 100)] + #[arg(long, default_value_t = 50)] retry_initial_backoff_ms: u64, /// Maximum backoff in milliseconds for retries - #[arg(long, default_value_t = 10000)] + #[arg(long, default_value_t = 30000)] retry_max_backoff_ms: u64, /// Backoff multiplier for exponential backoff - #[arg(long, default_value_t = 2.0)] + #[arg(long, default_value_t = 1.5)] retry_backoff_multiplier: f32, /// Jitter factor for retry backoff - #[arg(long, default_value_t = 0.1)] + #[arg(long, default_value_t = 0.2)] retry_jitter_factor: f32, /// Disable retries @@ -227,24 +227,45 @@ struct CliArgs { // Circuit breaker configuration /// Number of failures before circuit breaker opens - #[arg(long, default_value_t = 5)] + #[arg(long, default_value_t = 10)] cb_failure_threshold: u32, /// Number of successes before circuit breaker closes - #[arg(long, default_value_t = 2)] + #[arg(long, default_value_t = 3)] cb_success_threshold: u32, /// Timeout duration in seconds for circuit breaker - #[arg(long, default_value_t = 30)] + #[arg(long, default_value_t = 60)] cb_timeout_duration_secs: u64, /// Window duration in seconds for circuit breaker - #[arg(long, default_value_t = 60)] + #[arg(long, default_value_t = 120)] cb_window_duration_secs: u64, /// Disable circuit breaker #[arg(long, default_value_t = false)] disable_circuit_breaker: bool, + + // Health check configuration + /// Number of consecutive health check failures before marking worker unhealthy + #[arg(long, default_value_t = 3)] + health_failure_threshold: u32, + + /// Number of consecutive health check successes before marking worker healthy + #[arg(long, default_value_t = 2)] + health_success_threshold: u32, + + /// Timeout in seconds for health check requests + #[arg(long, default_value_t = 5)] + health_check_timeout_secs: u64, + + /// Interval in seconds between runtime health checks + #[arg(long, default_value_t = 60)] + health_check_interval_secs: u64, + + /// Health check endpoint path + #[arg(long, default_value = "/health")] + health_check_endpoint: String, } impl CliArgs { @@ -378,6 +399,13 @@ impl CliArgs { }, disable_retries: self.disable_retries, disable_circuit_breaker: self.disable_circuit_breaker, + health_check: HealthCheckConfig { + failure_threshold: self.health_failure_threshold, + success_threshold: self.health_success_threshold, + timeout_secs: self.health_check_timeout_secs, + check_interval_secs: self.health_check_interval_secs, + endpoint: self.health_check_endpoint.clone(), + }, }) } diff --git a/sgl-router/src/routers/factory.rs b/sgl-router/src/routers/factory.rs index 78dbb932e..a96e89b27 100644 --- a/sgl-router/src/routers/factory.rs +++ b/sgl-router/src/routers/factory.rs @@ -55,6 +55,7 @@ impl RouterFactory { ctx.router_config.api_key.clone(), ctx.router_config.retry.clone(), ctx.router_config.circuit_breaker.clone(), + ctx.router_config.health_check.clone(), ) .await?; @@ -87,6 +88,7 @@ impl RouterFactory { ctx.router_config.worker_startup_check_interval_secs, ctx.router_config.retry.clone(), ctx.router_config.circuit_breaker.clone(), + ctx.router_config.health_check.clone(), ) .await?; diff --git a/sgl-router/src/routers/pd_router.rs b/sgl-router/src/routers/pd_router.rs index 729bca0e7..0c6cc6158 100644 --- a/sgl-router/src/routers/pd_router.rs +++ b/sgl-router/src/routers/pd_router.rs @@ -1,10 +1,13 @@ // PD (Prefill-Decode) Router Implementation // This module handles routing for disaggregated prefill-decode systems use super::pd_types::{api_path, PDRouterError}; -use crate::config::types::{CircuitBreakerConfig as ConfigCircuitBreakerConfig, RetryConfig}; +use crate::config::types::{ + CircuitBreakerConfig as ConfigCircuitBreakerConfig, + HealthCheckConfig as ConfigHealthCheckConfig, RetryConfig, +}; use crate::core::{ - is_retryable_status, CircuitBreakerConfig, HealthChecker, RetryExecutor, Worker, WorkerFactory, - WorkerLoadGuard, + is_retryable_status, BasicWorker, CircuitBreakerConfig, HealthChecker, HealthConfig, + RetryExecutor, Worker, WorkerFactory, WorkerLoadGuard, WorkerType, }; use crate::metrics::RouterMetrics; use crate::openai_api_types::{ChatCompletionRequest, CompletionRequest, GenerateRequest}; @@ -360,6 +363,7 @@ impl PDRouter { interval_secs: u64, retry_config: RetryConfig, circuit_breaker_config: ConfigCircuitBreakerConfig, + health_check_config: ConfigHealthCheckConfig, ) -> Result { // Convert config CircuitBreakerConfig to core CircuitBreakerConfig let core_cb_config = CircuitBreakerConfig { @@ -369,17 +373,42 @@ impl PDRouter { window_duration: Duration::from_secs(circuit_breaker_config.window_duration_secs), }; - // Convert URLs to Worker trait objects + // Convert URLs to Worker trait objects with health check config let prefill_workers: Vec> = prefill_urls .into_iter() .map(|(url, port)| { - WorkerFactory::create_prefill_with_config(url, port, core_cb_config.clone()) + let worker = BasicWorker::new( + url, + WorkerType::Prefill { + bootstrap_port: port, + }, + ) + .with_circuit_breaker_config(core_cb_config.clone()) + .with_health_config(HealthConfig { + timeout_secs: health_check_config.timeout_secs, + check_interval_secs: health_check_config.check_interval_secs, + endpoint: health_check_config.endpoint.clone(), + failure_threshold: health_check_config.failure_threshold, + success_threshold: health_check_config.success_threshold, + }); + Box::new(worker) as Box }) .collect(); let decode_workers: Vec> = decode_urls .into_iter() - .map(|url| WorkerFactory::create_decode_with_config(url, core_cb_config.clone())) + .map(|url| { + let worker = BasicWorker::new(url, WorkerType::Decode) + .with_circuit_breaker_config(core_cb_config.clone()) + .with_health_config(HealthConfig { + timeout_secs: health_check_config.timeout_secs, + check_interval_secs: health_check_config.check_interval_secs, + endpoint: health_check_config.endpoint.clone(), + failure_threshold: health_check_config.failure_threshold, + success_threshold: health_check_config.success_threshold, + }); + Box::new(worker) as Box + }) .collect(); // Wait for PD workers to be healthy (skip if empty - for service discovery mode) @@ -443,10 +472,14 @@ impl PDRouter { let decode_workers = Arc::new(RwLock::new(decode_workers)); // Start health checkers for both worker pools - let prefill_health_checker = - crate::core::start_health_checker(Arc::clone(&prefill_workers), interval_secs); - let decode_health_checker = - crate::core::start_health_checker(Arc::clone(&decode_workers), interval_secs); + let prefill_health_checker = crate::core::start_health_checker( + Arc::clone(&prefill_workers), + health_check_config.check_interval_secs, + ); + let decode_health_checker = crate::core::start_health_checker( + Arc::clone(&decode_workers), + health_check_config.check_interval_secs, + ); // Build a dedicated prefill client for fire-and-forget semantics let prefill_client = reqwest::Client::builder() diff --git a/sgl-router/src/routers/router.rs b/sgl-router/src/routers/router.rs index 023607ac1..2e7e41703 100644 --- a/sgl-router/src/routers/router.rs +++ b/sgl-router/src/routers/router.rs @@ -1,6 +1,10 @@ -use crate::config::types::{CircuitBreakerConfig as ConfigCircuitBreakerConfig, RetryConfig}; +use crate::config::types::{ + CircuitBreakerConfig as ConfigCircuitBreakerConfig, + HealthCheckConfig as ConfigHealthCheckConfig, RetryConfig, +}; use crate::core::{ - is_retryable_status, CircuitBreakerConfig, HealthChecker, RetryExecutor, Worker, WorkerFactory, + is_retryable_status, BasicWorker, CircuitBreakerConfig, HealthChecker, HealthConfig, + RetryExecutor, Worker, WorkerFactory, WorkerType, }; use crate::metrics::RouterMetrics; use crate::openai_api_types::{ChatCompletionRequest, CompletionRequest, GenerateRequest}; @@ -61,6 +65,7 @@ impl Router { api_key: Option, retry_config: RetryConfig, circuit_breaker_config: ConfigCircuitBreakerConfig, + health_check_config: ConfigHealthCheckConfig, ) -> Result { // Update active workers gauge RouterMetrics::set_active_workers(worker_urls.len()); @@ -86,11 +91,20 @@ impl Router { window_duration: Duration::from_secs(circuit_breaker_config.window_duration_secs), }; - // Create Worker trait objects from URLs + // Create Worker trait objects from URLs with health check config let workers: Vec> = worker_urls .iter() .map(|url| { - WorkerFactory::create_regular_with_config(url.clone(), core_cb_config.clone()) + let worker = BasicWorker::new(url.clone(), WorkerType::Regular) + .with_circuit_breaker_config(core_cb_config.clone()) + .with_health_config(HealthConfig { + timeout_secs: health_check_config.timeout_secs, + check_interval_secs: health_check_config.check_interval_secs, + endpoint: health_check_config.endpoint.clone(), + failure_threshold: health_check_config.failure_threshold, + success_threshold: health_check_config.success_threshold, + }); + Box::new(worker) as Box }) .collect(); diff --git a/sgl-router/src/service_discovery.rs b/sgl-router/src/service_discovery.rs index d017756ff..1d57fe5d3 100644 --- a/sgl-router/src/service_discovery.rs +++ b/sgl-router/src/service_discovery.rs @@ -592,6 +592,7 @@ mod tests { None, crate::config::types::RetryConfig::default(), crate::config::types::CircuitBreakerConfig::default(), + crate::config::types::HealthCheckConfig::default(), ) .await .unwrap(); diff --git a/sgl-router/tests/api_endpoints_test.rs b/sgl-router/tests/api_endpoints_test.rs index 1cadead56..284fefb01 100644 --- a/sgl-router/tests/api_endpoints_test.rs +++ b/sgl-router/tests/api_endpoints_test.rs @@ -50,6 +50,7 @@ impl TestContext { circuit_breaker: CircuitBreakerConfig::default(), disable_retries: false, disable_circuit_breaker: false, + health_check: sglang_router_rs::config::HealthCheckConfig::default(), }; Self::new_with_config(config, worker_configs).await @@ -1091,6 +1092,7 @@ mod error_tests { circuit_breaker: CircuitBreakerConfig::default(), disable_retries: false, disable_circuit_breaker: false, + health_check: sglang_router_rs::config::HealthCheckConfig::default(), }; let ctx = TestContext::new_with_config( @@ -1441,6 +1443,7 @@ mod pd_mode_tests { circuit_breaker: CircuitBreakerConfig::default(), disable_retries: false, disable_circuit_breaker: false, + health_check: sglang_router_rs::config::HealthCheckConfig::default(), }; // Create app context @@ -1595,6 +1598,7 @@ mod request_id_tests { circuit_breaker: CircuitBreakerConfig::default(), disable_retries: false, disable_circuit_breaker: false, + health_check: sglang_router_rs::config::HealthCheckConfig::default(), }; let ctx = TestContext::new_with_config( diff --git a/sgl-router/tests/request_formats_test.rs b/sgl-router/tests/request_formats_test.rs index be04a9103..431e944d3 100644 --- a/sgl-router/tests/request_formats_test.rs +++ b/sgl-router/tests/request_formats_test.rs @@ -41,6 +41,7 @@ impl TestContext { circuit_breaker: CircuitBreakerConfig::default(), disable_retries: false, disable_circuit_breaker: false, + health_check: sglang_router_rs::config::HealthCheckConfig::default(), }; let mut workers = Vec::new(); diff --git a/sgl-router/tests/streaming_tests.rs b/sgl-router/tests/streaming_tests.rs index bb0090daa..4674593b8 100644 --- a/sgl-router/tests/streaming_tests.rs +++ b/sgl-router/tests/streaming_tests.rs @@ -42,6 +42,7 @@ impl TestContext { circuit_breaker: CircuitBreakerConfig::default(), disable_retries: false, disable_circuit_breaker: false, + health_check: sglang_router_rs::config::HealthCheckConfig::default(), }; let mut workers = Vec::new(); diff --git a/sgl-router/tests/test_pd_routing.rs b/sgl-router/tests/test_pd_routing.rs index 72b1697cf..6035da0a8 100644 --- a/sgl-router/tests/test_pd_routing.rs +++ b/sgl-router/tests/test_pd_routing.rs @@ -184,6 +184,7 @@ mod test_pd_routing { circuit_breaker: CircuitBreakerConfig::default(), disable_retries: false, disable_circuit_breaker: false, + health_check: sglang_router_rs::config::HealthCheckConfig::default(), }; // Router creation will fail due to health checks, but config should be valid