diff --git a/sgl-router/py_src/sglang_router/launch_router.py b/sgl-router/py_src/sglang_router/launch_router.py index 384e3666d..38f1fbba2 100644 --- a/sgl-router/py_src/sglang_router/launch_router.py +++ b/sgl-router/py_src/sglang_router/launch_router.py @@ -34,6 +34,7 @@ class RouterArgs: # Routing policy policy: str = "cache_aware" worker_startup_timeout_secs: int = 300 + worker_startup_check_interval: int = 10 cache_threshold: float = 0.5 balance_abs_threshold: int = 32 balance_rel_threshold: float = 1.0001 @@ -94,6 +95,12 @@ class RouterArgs: default=RouterArgs.worker_startup_timeout_secs, help="Timeout in seconds for worker startup", ) + parser.add_argument( + f"--{prefix}worker-startup-check-interval", + type=int, + default=RouterArgs.worker_startup_check_interval, + help="Interval in seconds between checks for worker startup", + ) parser.add_argument( f"--{prefix}cache-threshold", type=float, @@ -157,6 +164,9 @@ class RouterArgs: worker_startup_timeout_secs=getattr( args, f"{prefix}worker_startup_timeout_secs" ), + worker_startup_check_interval=getattr( + args, f"{prefix}worker_startup_check_interval" + ), cache_threshold=getattr(args, f"{prefix}cache_threshold"), balance_abs_threshold=getattr(args, f"{prefix}balance_abs_threshold"), balance_rel_threshold=getattr(args, f"{prefix}balance_rel_threshold"), @@ -202,6 +212,7 @@ def launch_router(args: argparse.Namespace) -> Optional[Router]: port=router_args.port, policy=policy_from_str(router_args.policy), worker_startup_timeout_secs=router_args.worker_startup_timeout_secs, + worker_startup_check_interval=router_args.worker_startup_check_interval, cache_threshold=router_args.cache_threshold, balance_abs_threshold=router_args.balance_abs_threshold, balance_rel_threshold=router_args.balance_rel_threshold, diff --git a/sgl-router/py_src/sglang_router/router.py b/sgl-router/py_src/sglang_router/router.py index 1665f8a67..b8757168b 100644 --- a/sgl-router/py_src/sglang_router/router.py +++ b/sgl-router/py_src/sglang_router/router.py @@ -18,6 +18,7 @@ class Router: host: Host address to bind the router server. Default: '127.0.0.1' port: Port number to bind the router server. Default: 3001 worker_startup_timeout_secs: Timeout in seconds for worker startup. Default: 300 + worker_startup_check_interval: Interval in seconds between checks for worker initialization. Default: 10 cache_threshold: Cache threshold (0.0-1.0) for cache-aware routing. Routes to cached worker if the match rate exceeds threshold, otherwise routes to the worker with the smallest tree. Default: 0.5 @@ -39,6 +40,7 @@ class Router: host: str = "127.0.0.1", port: int = 3001, worker_startup_timeout_secs: int = 300, + worker_startup_check_interval: int = 10, cache_threshold: float = 0.50, balance_abs_threshold: int = 32, balance_rel_threshold: float = 1.0001, @@ -53,6 +55,7 @@ class Router: host=host, port=port, worker_startup_timeout_secs=worker_startup_timeout_secs, + worker_startup_check_interval=worker_startup_check_interval, cache_threshold=cache_threshold, balance_abs_threshold=balance_abs_threshold, balance_rel_threshold=balance_rel_threshold, diff --git a/sgl-router/py_test/test_launch_router.py b/sgl-router/py_test/test_launch_router.py index 15549cae7..27ed64d6e 100644 --- a/sgl-router/py_test/test_launch_router.py +++ b/sgl-router/py_test/test_launch_router.py @@ -29,6 +29,7 @@ class TestLaunchRouter(unittest.TestCase): port=30000, policy="cache_aware", worker_startup_timeout_secs=600, + worker_startup_check_interval=10, cache_threshold=0.5, balance_abs_threshold=32, balance_rel_threshold=1.0001, diff --git a/sgl-router/src/lib.rs b/sgl-router/src/lib.rs index 8355f1352..ba9aeac1f 100644 --- a/sgl-router/src/lib.rs +++ b/sgl-router/src/lib.rs @@ -18,6 +18,7 @@ struct Router { worker_urls: Vec, policy: PolicyType, worker_startup_timeout_secs: u64, + worker_startup_check_interval: u64, cache_threshold: f32, balance_abs_threshold: usize, balance_rel_threshold: f32, @@ -36,6 +37,7 @@ impl Router { host = String::from("127.0.0.1"), port = 3001, worker_startup_timeout_secs = 300, + worker_startup_check_interval = 10, cache_threshold = 0.50, balance_abs_threshold = 32, balance_rel_threshold = 1.0001, @@ -50,6 +52,7 @@ impl Router { host: String, port: u16, worker_startup_timeout_secs: u64, + worker_startup_check_interval: u64, cache_threshold: f32, balance_abs_threshold: usize, balance_rel_threshold: f32, @@ -64,6 +67,7 @@ impl Router { worker_urls, policy, worker_startup_timeout_secs, + worker_startup_check_interval, cache_threshold, balance_abs_threshold, balance_rel_threshold, @@ -78,12 +82,15 @@ impl Router { let policy_config = match &self.policy { PolicyType::Random => router::PolicyConfig::RandomConfig { timeout_secs: self.worker_startup_timeout_secs, + interval_secs: self.worker_startup_check_interval, }, PolicyType::RoundRobin => router::PolicyConfig::RoundRobinConfig { timeout_secs: self.worker_startup_timeout_secs, + interval_secs: self.worker_startup_check_interval, }, PolicyType::CacheAware => router::PolicyConfig::CacheAwareConfig { timeout_secs: self.worker_startup_timeout_secs, + interval_secs: self.worker_startup_check_interval, cache_threshold: self.cache_threshold, balance_abs_threshold: self.balance_abs_threshold, balance_rel_threshold: self.balance_rel_threshold, diff --git a/sgl-router/src/router.rs b/sgl-router/src/router.rs index 6ea791685..5bbffc74c 100644 --- a/sgl-router/src/router.rs +++ b/sgl-router/src/router.rs @@ -18,10 +18,12 @@ pub enum Router { worker_urls: Arc>>, current_index: AtomicUsize, timeout_secs: u64, + interval_secs: u64, }, Random { worker_urls: Arc>>, timeout_secs: u64, + interval_secs: u64, }, CacheAware { /* @@ -92,6 +94,7 @@ pub enum Router { balance_abs_threshold: usize, balance_rel_threshold: f32, timeout_secs: u64, + interval_secs: u64, _eviction_thread: Option>, }, } @@ -100,9 +103,11 @@ pub enum Router { pub enum PolicyConfig { RandomConfig { timeout_secs: u64, + interval_secs: u64, }, RoundRobinConfig { timeout_secs: u64, + interval_secs: u64, }, CacheAwareConfig { cache_threshold: f32, @@ -111,31 +116,50 @@ pub enum PolicyConfig { eviction_interval_secs: u64, max_tree_size: usize, timeout_secs: u64, + interval_secs: u64, }, } impl Router { pub fn new(worker_urls: Vec, policy_config: PolicyConfig) -> Result { - // Get timeout from policy config - let timeout_secs = match &policy_config { - PolicyConfig::RandomConfig { timeout_secs } => *timeout_secs, - PolicyConfig::RoundRobinConfig { timeout_secs } => *timeout_secs, - PolicyConfig::CacheAwareConfig { timeout_secs, .. } => *timeout_secs, + // Get timeout and interval from policy config + let (timeout_secs, interval_secs) = match &policy_config { + PolicyConfig::RandomConfig { + timeout_secs, + interval_secs, + } => (*timeout_secs, *interval_secs), + PolicyConfig::RoundRobinConfig { + timeout_secs, + interval_secs, + } => (*timeout_secs, *interval_secs), + PolicyConfig::CacheAwareConfig { + timeout_secs, + interval_secs, + .. + } => (*timeout_secs, *interval_secs), }; // Wait until all workers are healthy - Self::wait_for_healthy_workers(&worker_urls, timeout_secs, 10)?; + Self::wait_for_healthy_workers(&worker_urls, timeout_secs, interval_secs)?; // Create router based on policy... Ok(match policy_config { - PolicyConfig::RandomConfig { timeout_secs } => Router::Random { + PolicyConfig::RandomConfig { + timeout_secs, + interval_secs, + } => Router::Random { worker_urls: Arc::new(RwLock::new(worker_urls)), timeout_secs, + interval_secs, }, - PolicyConfig::RoundRobinConfig { timeout_secs } => Router::RoundRobin { + PolicyConfig::RoundRobinConfig { + timeout_secs, + interval_secs, + } => Router::RoundRobin { worker_urls: Arc::new(RwLock::new(worker_urls)), current_index: std::sync::atomic::AtomicUsize::new(0), timeout_secs, + interval_secs, }, PolicyConfig::CacheAwareConfig { cache_threshold, @@ -144,6 +168,7 @@ impl Router { eviction_interval_secs, max_tree_size, timeout_secs, + interval_secs, } => { let mut running_queue = HashMap::new(); for url in &worker_urls { @@ -195,6 +220,7 @@ impl Router { balance_abs_threshold, balance_rel_threshold, timeout_secs, + interval_secs, _eviction_thread: Some(eviction_thread), } } @@ -594,11 +620,22 @@ impl Router { } pub async fn add_worker(&self, worker_url: &str) -> Result { - let interval_secs = 10; // check every 10 seconds - let timeout_secs = match self { - Router::Random { timeout_secs, .. } => *timeout_secs, - Router::RoundRobin { timeout_secs, .. } => *timeout_secs, - Router::CacheAware { timeout_secs, .. } => *timeout_secs, + let (timeout_secs, interval_secs) = match self { + Router::Random { + timeout_secs, + interval_secs, + .. + } => (*timeout_secs, *interval_secs), + Router::RoundRobin { + timeout_secs, + interval_secs, + .. + } => (*timeout_secs, *interval_secs), + Router::CacheAware { + timeout_secs, + interval_secs, + .. + } => (*timeout_secs, *interval_secs), }; let start_time = std::time::Instant::now();