From 4f8a982d52b57a8b65c92a895dfe39fbe4d5b1ad Mon Sep 17 00:00:00 2001 From: Simo Lin Date: Thu, 4 Sep 2025 00:35:51 -0400 Subject: [PATCH] [router] clean up dependency injector to use ctx (#10000) --- sgl-router/src/routers/factory.rs | 106 ++--------------------- sgl-router/src/routers/grpc/pd_router.rs | 75 ++++++++-------- sgl-router/src/routers/grpc/router.rs | 58 +++++++------ sgl-router/src/routers/http/pd_router.rs | 58 ++++++------- sgl-router/src/routers/http/router.rs | 53 +++++------- sgl-router/src/service_discovery.rs | 32 +++---- 6 files changed, 147 insertions(+), 235 deletions(-) diff --git a/sgl-router/src/routers/factory.rs b/sgl-router/src/routers/factory.rs index a297a7ede..05bb459de 100644 --- a/sgl-router/src/routers/factory.rs +++ b/sgl-router/src/routers/factory.rs @@ -83,20 +83,8 @@ impl RouterFactory { // Create policy let policy = PolicyFactory::create_from_config(policy_config); - // Create regular router with injected policy and client - let router = Router::new( - worker_urls.to_vec(), - policy, - ctx.client.clone(), - ctx.router_config.worker_startup_timeout_secs, - ctx.router_config.worker_startup_check_interval_secs, - ctx.router_config.dp_aware, - ctx.router_config.api_key.clone(), - ctx.router_config.retry.clone(), - ctx.router_config.circuit_breaker.clone(), - ctx.router_config.health_check.clone(), - ) - .await?; + // Create regular router with injected policy and context + let router = Router::new(worker_urls.to_vec(), policy, ctx).await?; Ok(Box::new(router)) } @@ -116,19 +104,13 @@ impl RouterFactory { let decode_policy = PolicyFactory::create_from_config(decode_policy_config.unwrap_or(main_policy_config)); - // Create PD router with separate policies and client + // Create PD router with separate policies and context let router = PDRouter::new( prefill_urls.to_vec(), decode_urls.to_vec(), prefill_policy, decode_policy, - ctx.client.clone(), - ctx.router_config.request_timeout_secs, - ctx.router_config.worker_startup_timeout_secs, - ctx.router_config.worker_startup_check_interval_secs, - ctx.router_config.retry.clone(), - ctx.router_config.circuit_breaker.clone(), - ctx.router_config.health_check.clone(), + ctx, ) .await?; @@ -146,46 +128,8 @@ impl RouterFactory { // Create policy let policy = PolicyFactory::create_from_config(policy_config); - // Get tokenizer from context - let tokenizer = ctx - .tokenizer - .as_ref() - .ok_or_else(|| { - "gRPC router requires tokenizer to be initialized in AppContext".to_string() - })? - .clone(); - - // Get reasoning parser factory from context - let reasoning_parser_factory = ctx - .reasoning_parser_factory - .as_ref() - .ok_or_else(|| { - "gRPC router requires reasoning parser factory to be initialized in AppContext" - .to_string() - })? - .clone(); - - // Get tool parser registry from context - let tool_parser_registry = ctx.tool_parser_registry.ok_or_else(|| { - "gRPC router requires tool parser registry to be initialized in AppContext".to_string() - })?; - - // Create gRPC router - let router = GrpcRouter::new( - worker_urls.to_vec(), - policy, - ctx.router_config.worker_startup_timeout_secs, - ctx.router_config.worker_startup_check_interval_secs, - ctx.router_config.dp_aware, - ctx.router_config.api_key.clone(), - ctx.router_config.effective_retry_config(), - ctx.router_config.effective_circuit_breaker_config(), - ctx.router_config.health_check.clone(), - tokenizer, - reasoning_parser_factory, - tool_parser_registry, - ) - .await?; + // Create gRPC router with context + let router = GrpcRouter::new(worker_urls.to_vec(), policy, ctx).await?; Ok(Box::new(router)) } @@ -207,47 +151,13 @@ impl RouterFactory { let decode_policy = PolicyFactory::create_from_config(decode_policy_config.unwrap_or(main_policy_config)); - // Get tokenizer from context - let tokenizer = ctx - .tokenizer - .as_ref() - .ok_or_else(|| { - "gRPC PD router requires tokenizer to be initialized in AppContext".to_string() - })? - .clone(); - - // Get reasoning parser factory from context - let reasoning_parser_factory = ctx - .reasoning_parser_factory - .as_ref() - .ok_or_else(|| { - "gRPC PD router requires reasoning parser factory to be initialized in AppContext" - .to_string() - })? - .clone(); - - // Get tool parser registry from context - let tool_parser_registry = ctx.tool_parser_registry.ok_or_else(|| { - "gRPC PD router requires tool parser registry to be initialized in AppContext" - .to_string() - })?; - - // Create gRPC PD router + // Create gRPC PD router with context let router = GrpcPDRouter::new( prefill_urls.to_vec(), decode_urls.to_vec(), prefill_policy, decode_policy, - ctx.router_config.worker_startup_timeout_secs, - ctx.router_config.worker_startup_check_interval_secs, - ctx.router_config.dp_aware, - ctx.router_config.api_key.clone(), - ctx.router_config.effective_retry_config(), - ctx.router_config.effective_circuit_breaker_config(), - ctx.router_config.health_check.clone(), - tokenizer, - reasoning_parser_factory, - tool_parser_registry, + ctx, ) .await?; diff --git a/sgl-router/src/routers/grpc/pd_router.rs b/sgl-router/src/routers/grpc/pd_router.rs index 43d143d81..d227b460d 100644 --- a/sgl-router/src/routers/grpc/pd_router.rs +++ b/sgl-router/src/routers/grpc/pd_router.rs @@ -1,9 +1,6 @@ // PD (Prefill-Decode) gRPC Router Implementation -use crate::config::types::{ - CircuitBreakerConfig as ConfigCircuitBreakerConfig, - HealthCheckConfig as ConfigHealthCheckConfig, RetryConfig, -}; +use crate::config::types::RetryConfig; use crate::core::{ BasicWorker, CircuitBreakerConfig, HealthChecker, HealthConfig, Worker, WorkerType, }; @@ -61,27 +58,33 @@ pub struct GrpcPDRouter { impl GrpcPDRouter { /// Create a new gRPC PD router - #[allow(clippy::too_many_arguments)] pub async fn new( prefill_urls: Vec<(String, Option)>, decode_urls: Vec, prefill_policy: Arc, decode_policy: Arc, - timeout_secs: u64, - interval_secs: u64, - dp_aware: bool, - api_key: Option, - retry_config: RetryConfig, - circuit_breaker_config: ConfigCircuitBreakerConfig, - health_check_config: ConfigHealthCheckConfig, - tokenizer: Arc, - reasoning_parser_factory: ParserFactory, - tool_parser_registry: &'static ParserRegistry, + ctx: &Arc, ) -> Result { // Update metrics RouterMetrics::set_active_workers(prefill_urls.len() + decode_urls.len()); + // Extract necessary components from context + let tokenizer = ctx + .tokenizer + .as_ref() + .ok_or_else(|| "gRPC PD router requires tokenizer".to_string())? + .clone(); + let reasoning_parser_factory = ctx + .reasoning_parser_factory + .as_ref() + .ok_or_else(|| "gRPC PD router requires reasoning parser factory".to_string())? + .clone(); + let tool_parser_registry = ctx + .tool_parser_registry + .ok_or_else(|| "gRPC PD router requires tool parser registry".to_string())?; + // Convert config CircuitBreakerConfig to core CircuitBreakerConfig + let circuit_breaker_config = ctx.router_config.effective_circuit_breaker_config(); let core_cb_config = CircuitBreakerConfig { failure_threshold: circuit_breaker_config.failure_threshold, success_threshold: circuit_breaker_config.success_threshold, @@ -138,11 +141,11 @@ impl GrpcPDRouter { ) .with_circuit_breaker_config(core_cb_config.clone()) .with_health_config(HealthConfig { - timeout_secs: health_check_config.timeout_secs, - check_interval_secs: health_check_config.check_interval_secs, - endpoint: health_check_config.endpoint.clone(), - failure_threshold: health_check_config.failure_threshold, - success_threshold: health_check_config.success_threshold, + timeout_secs: ctx.router_config.health_check.timeout_secs, + check_interval_secs: ctx.router_config.health_check.check_interval_secs, + endpoint: ctx.router_config.health_check.endpoint.clone(), + failure_threshold: ctx.router_config.health_check.failure_threshold, + success_threshold: ctx.router_config.health_check.success_threshold, }); Box::new(worker) as Box }) @@ -159,11 +162,11 @@ impl GrpcPDRouter { ) .with_circuit_breaker_config(core_cb_config.clone()) .with_health_config(HealthConfig { - timeout_secs: health_check_config.timeout_secs, - check_interval_secs: health_check_config.check_interval_secs, - endpoint: health_check_config.endpoint.clone(), - failure_threshold: health_check_config.failure_threshold, - success_threshold: health_check_config.success_threshold, + timeout_secs: ctx.router_config.health_check.timeout_secs, + check_interval_secs: ctx.router_config.health_check.check_interval_secs, + endpoint: ctx.router_config.health_check.endpoint.clone(), + failure_threshold: ctx.router_config.health_check.failure_threshold, + success_threshold: ctx.router_config.health_check.success_threshold, }); Box::new(worker) as Box }) @@ -187,10 +190,14 @@ impl GrpcPDRouter { let prefill_workers = Arc::new(RwLock::new(prefill_workers)); let decode_workers = Arc::new(RwLock::new(decode_workers)); - let prefill_health_checker = - crate::core::start_health_checker(Arc::clone(&prefill_workers), interval_secs); - let decode_health_checker = - crate::core::start_health_checker(Arc::clone(&decode_workers), interval_secs); + let prefill_health_checker = crate::core::start_health_checker( + Arc::clone(&prefill_workers), + ctx.router_config.worker_startup_check_interval_secs, + ); + let decode_health_checker = crate::core::start_health_checker( + Arc::clone(&decode_workers), + ctx.router_config.worker_startup_check_interval_secs, + ); Ok(GrpcPDRouter { prefill_workers, @@ -204,11 +211,11 @@ impl GrpcPDRouter { tool_parser_registry, _prefill_health_checker: Some(prefill_health_checker), _decode_health_checker: Some(decode_health_checker), - timeout_secs, - interval_secs, - dp_aware, - api_key, - retry_config, + timeout_secs: ctx.router_config.worker_startup_timeout_secs, + interval_secs: ctx.router_config.worker_startup_check_interval_secs, + dp_aware: ctx.router_config.dp_aware, + api_key: ctx.router_config.api_key.clone(), + retry_config: ctx.router_config.effective_retry_config(), circuit_breaker_config: core_cb_config, }) } diff --git a/sgl-router/src/routers/grpc/router.rs b/sgl-router/src/routers/grpc/router.rs index be2f5ae33..5c499125f 100644 --- a/sgl-router/src/routers/grpc/router.rs +++ b/sgl-router/src/routers/grpc/router.rs @@ -1,9 +1,6 @@ // gRPC Router Implementation -use crate::config::types::{ - CircuitBreakerConfig as ConfigCircuitBreakerConfig, - HealthCheckConfig as ConfigHealthCheckConfig, RetryConfig, -}; +use crate::config::types::RetryConfig; use crate::core::{ BasicWorker, CircuitBreakerConfig, HealthChecker, HealthConfig, Worker, WorkerType, }; @@ -54,25 +51,31 @@ pub struct GrpcRouter { impl GrpcRouter { /// Create a new gRPC router - #[allow(clippy::too_many_arguments)] pub async fn new( worker_urls: Vec, policy: Arc, - timeout_secs: u64, - interval_secs: u64, - dp_aware: bool, - api_key: Option, - retry_config: RetryConfig, - circuit_breaker_config: ConfigCircuitBreakerConfig, - health_check_config: ConfigHealthCheckConfig, - tokenizer: Arc, - reasoning_parser_factory: ParserFactory, - tool_parser_registry: &'static ParserRegistry, + ctx: &Arc, ) -> Result { // Update metrics RouterMetrics::set_active_workers(worker_urls.len()); + // Extract necessary components from context + let tokenizer = ctx + .tokenizer + .as_ref() + .ok_or_else(|| "gRPC router requires tokenizer".to_string())? + .clone(); + let reasoning_parser_factory = ctx + .reasoning_parser_factory + .as_ref() + .ok_or_else(|| "gRPC router requires reasoning parser factory".to_string())? + .clone(); + let tool_parser_registry = ctx + .tool_parser_registry + .ok_or_else(|| "gRPC router requires tool parser registry".to_string())?; + // Convert config CircuitBreakerConfig to core CircuitBreakerConfig + let circuit_breaker_config = ctx.router_config.effective_circuit_breaker_config(); let core_cb_config = CircuitBreakerConfig { failure_threshold: circuit_breaker_config.failure_threshold, success_threshold: circuit_breaker_config.success_threshold, @@ -112,11 +115,11 @@ impl GrpcRouter { ) .with_circuit_breaker_config(core_cb_config.clone()) .with_health_config(HealthConfig { - timeout_secs: health_check_config.timeout_secs, - check_interval_secs: health_check_config.check_interval_secs, - endpoint: health_check_config.endpoint.clone(), - failure_threshold: health_check_config.failure_threshold, - success_threshold: health_check_config.success_threshold, + timeout_secs: ctx.router_config.health_check.timeout_secs, + check_interval_secs: ctx.router_config.health_check.check_interval_secs, + endpoint: ctx.router_config.health_check.endpoint.clone(), + failure_threshold: ctx.router_config.health_check.failure_threshold, + success_threshold: ctx.router_config.health_check.success_threshold, }) .with_grpc_client(client); @@ -135,7 +138,10 @@ impl GrpcRouter { } let workers = Arc::new(RwLock::new(workers)); - let health_checker = crate::core::start_health_checker(Arc::clone(&workers), interval_secs); + let health_checker = crate::core::start_health_checker( + Arc::clone(&workers), + ctx.router_config.worker_startup_check_interval_secs, + ); Ok(GrpcRouter { workers, @@ -145,11 +151,11 @@ impl GrpcRouter { reasoning_parser_factory, tool_parser_registry, _health_checker: Some(health_checker), - timeout_secs, - interval_secs, - dp_aware, - api_key, - retry_config, + timeout_secs: ctx.router_config.worker_startup_timeout_secs, + interval_secs: ctx.router_config.worker_startup_check_interval_secs, + dp_aware: ctx.router_config.dp_aware, + api_key: ctx.router_config.api_key.clone(), + retry_config: ctx.router_config.effective_retry_config(), circuit_breaker_config: core_cb_config, }) } diff --git a/sgl-router/src/routers/http/pd_router.rs b/sgl-router/src/routers/http/pd_router.rs index beb40e45e..528ead5f5 100644 --- a/sgl-router/src/routers/http/pd_router.rs +++ b/sgl-router/src/routers/http/pd_router.rs @@ -1,10 +1,7 @@ // PD (Prefill-Decode) Router Implementation // This module handles routing for disaggregated prefill-decode systems use super::pd_types::{api_path, PDRouterError}; -use crate::config::types::{ - CircuitBreakerConfig as ConfigCircuitBreakerConfig, - HealthCheckConfig as ConfigHealthCheckConfig, RetryConfig, -}; +use crate::config::types::RetryConfig; use crate::core::{ is_retryable_status, BasicWorker, CircuitBreakerConfig, HealthChecker, HealthConfig, RetryExecutor, Worker, WorkerFactory, WorkerLoadGuard, WorkerType, @@ -375,15 +372,10 @@ impl PDRouter { decode_urls: Vec, prefill_policy: Arc, decode_policy: Arc, - client: Client, - prefill_request_timeout_secs: u64, - worker_startup_timeout_secs: u64, - worker_startup_check_interval_secs: u64, - retry_config: RetryConfig, - circuit_breaker_config: ConfigCircuitBreakerConfig, - health_check_config: ConfigHealthCheckConfig, + ctx: &Arc, ) -> Result { // Convert config CircuitBreakerConfig to core CircuitBreakerConfig + let circuit_breaker_config = ctx.router_config.effective_circuit_breaker_config(); let core_cb_config = CircuitBreakerConfig { failure_threshold: circuit_breaker_config.failure_threshold, success_threshold: circuit_breaker_config.success_threshold, @@ -403,11 +395,11 @@ impl PDRouter { ) .with_circuit_breaker_config(core_cb_config.clone()) .with_health_config(HealthConfig { - timeout_secs: health_check_config.timeout_secs, - check_interval_secs: health_check_config.check_interval_secs, - endpoint: health_check_config.endpoint.clone(), - failure_threshold: health_check_config.failure_threshold, - success_threshold: health_check_config.success_threshold, + timeout_secs: ctx.router_config.health_check.timeout_secs, + check_interval_secs: ctx.router_config.health_check.check_interval_secs, + endpoint: ctx.router_config.health_check.endpoint.clone(), + failure_threshold: ctx.router_config.health_check.failure_threshold, + success_threshold: ctx.router_config.health_check.success_threshold, }); Box::new(worker) as Box }) @@ -419,11 +411,11 @@ impl PDRouter { let worker = BasicWorker::new(url, WorkerType::Decode) .with_circuit_breaker_config(core_cb_config.clone()) .with_health_config(HealthConfig { - timeout_secs: health_check_config.timeout_secs, - check_interval_secs: health_check_config.check_interval_secs, - endpoint: health_check_config.endpoint.clone(), - failure_threshold: health_check_config.failure_threshold, - success_threshold: health_check_config.success_threshold, + timeout_secs: ctx.router_config.health_check.timeout_secs, + check_interval_secs: ctx.router_config.health_check.check_interval_secs, + endpoint: ctx.router_config.health_check.endpoint.clone(), + failure_threshold: ctx.router_config.health_check.failure_threshold, + success_threshold: ctx.router_config.health_check.success_threshold, }); Box::new(worker) as Box }) @@ -438,8 +430,8 @@ impl PDRouter { if !all_urls.is_empty() { crate::routers::http::router::Router::wait_for_healthy_workers( &all_urls, - worker_startup_timeout_secs, - worker_startup_check_interval_secs, + ctx.router_config.worker_startup_timeout_secs, + ctx.router_config.worker_startup_check_interval_secs, ) .await?; } @@ -466,8 +458,8 @@ impl PDRouter { let load_monitor_handle = if prefill_policy.name() == "power_of_two" || decode_policy.name() == "power_of_two" { let monitor_urls = all_urls.clone(); - let monitor_interval = worker_startup_check_interval_secs; - let monitor_client = client.clone(); + let monitor_interval = ctx.router_config.worker_startup_check_interval_secs; + let monitor_client = ctx.client.clone(); let prefill_policy_clone = Arc::clone(&prefill_policy); let decode_policy_clone = Arc::clone(&decode_policy); @@ -492,11 +484,11 @@ impl PDRouter { // Start health checkers for both worker pools let prefill_health_checker = crate::core::start_health_checker( Arc::clone(&prefill_workers), - health_check_config.check_interval_secs, + ctx.router_config.health_check.check_interval_secs, ); let decode_health_checker = crate::core::start_health_checker( Arc::clone(&decode_workers), - health_check_config.check_interval_secs, + ctx.router_config.health_check.check_interval_secs, ); // Build a dedicated prefill client for fire-and-forget semantics @@ -504,7 +496,7 @@ impl PDRouter { .pool_max_idle_per_host(0) .http1_only() .connect_timeout(Duration::from_millis(300)) - .timeout(Duration::from_secs(prefill_request_timeout_secs)) + .timeout(Duration::from_secs(ctx.router_config.request_timeout_secs)) .build() .map_err(|e| format!("Failed to build prefill client: {}", e))?; @@ -582,14 +574,16 @@ impl PDRouter { decode_workers, prefill_policy, decode_policy, - worker_startup_timeout_secs, - worker_startup_check_interval_secs, + worker_startup_timeout_secs: ctx.router_config.worker_startup_timeout_secs, + worker_startup_check_interval_secs: ctx + .router_config + .worker_startup_check_interval_secs, worker_loads, load_monitor_handle, - client, + client: ctx.client.clone(), prefill_client, prefill_drain_tx, - retry_config, + retry_config: ctx.router_config.effective_retry_config(), circuit_breaker_config: core_cb_config, _prefill_health_checker: Some(prefill_health_checker), _decode_health_checker: Some(decode_health_checker), diff --git a/sgl-router/src/routers/http/router.rs b/sgl-router/src/routers/http/router.rs index 963bef4aa..176c38602 100644 --- a/sgl-router/src/routers/http/router.rs +++ b/sgl-router/src/routers/http/router.rs @@ -1,7 +1,4 @@ -use crate::config::types::{ - CircuitBreakerConfig as ConfigCircuitBreakerConfig, - HealthCheckConfig as ConfigHealthCheckConfig, RetryConfig, -}; +use crate::config::types::RetryConfig; use crate::core::{ is_retryable_status, BasicWorker, CircuitBreakerConfig, HealthChecker, HealthConfig, RetryExecutor, Worker, WorkerFactory, WorkerType, @@ -51,14 +48,7 @@ impl Router { pub async fn new( worker_urls: Vec, policy: Arc, - client: Client, - worker_startup_timeout_secs: u64, - worker_startup_check_interval_secs: u64, - dp_aware: bool, - api_key: Option, - retry_config: RetryConfig, - circuit_breaker_config: ConfigCircuitBreakerConfig, - health_check_config: ConfigHealthCheckConfig, + ctx: &Arc, ) -> Result { // Update active workers gauge RouterMetrics::set_active_workers(worker_urls.len()); @@ -67,21 +57,22 @@ impl Router { if !worker_urls.is_empty() { Self::wait_for_healthy_workers( &worker_urls, - worker_startup_timeout_secs, - worker_startup_check_interval_secs, + ctx.router_config.worker_startup_timeout_secs, + ctx.router_config.worker_startup_check_interval_secs, ) .await?; } - let worker_urls = if dp_aware { + let worker_urls = if ctx.router_config.dp_aware { // worker address now in the format of "http://host:port@dp_rank" - Self::get_dp_aware_workers(&worker_urls, &api_key) + Self::get_dp_aware_workers(&worker_urls, &ctx.router_config.api_key) .map_err(|e| format!("Failed to get dp-aware workers: {}", e))? } else { worker_urls }; // Convert config CircuitBreakerConfig to core CircuitBreakerConfig + let circuit_breaker_config = ctx.router_config.effective_circuit_breaker_config(); let core_cb_config = CircuitBreakerConfig { failure_threshold: circuit_breaker_config.failure_threshold, success_threshold: circuit_breaker_config.success_threshold, @@ -96,11 +87,11 @@ impl Router { let worker = BasicWorker::new(url.clone(), WorkerType::Regular) .with_circuit_breaker_config(core_cb_config.clone()) .with_health_config(HealthConfig { - timeout_secs: health_check_config.timeout_secs, - check_interval_secs: health_check_config.check_interval_secs, - endpoint: health_check_config.endpoint.clone(), - failure_threshold: health_check_config.failure_threshold, - success_threshold: health_check_config.success_threshold, + timeout_secs: ctx.router_config.health_check.timeout_secs, + check_interval_secs: ctx.router_config.health_check.check_interval_secs, + endpoint: ctx.router_config.health_check.endpoint.clone(), + failure_threshold: ctx.router_config.health_check.failure_threshold, + success_threshold: ctx.router_config.health_check.success_threshold, }); Box::new(worker) as Box }) @@ -117,7 +108,7 @@ impl Router { let workers = Arc::new(RwLock::new(workers)); let health_checker = crate::core::start_health_checker( Arc::clone(&workers), - worker_startup_check_interval_secs, + ctx.router_config.worker_startup_check_interval_secs, ); // Setup load monitoring for PowerOfTwo policy @@ -126,9 +117,9 @@ impl Router { let load_monitor_handle = if policy.name() == "power_of_two" { let monitor_urls = worker_urls.clone(); - let monitor_interval = worker_startup_check_interval_secs; + let monitor_interval = ctx.router_config.worker_startup_check_interval_secs; let policy_clone = Arc::clone(&policy); - let client_clone = client.clone(); + let client_clone = ctx.client.clone(); Some(Arc::new(tokio::spawn(async move { Self::monitor_worker_loads( @@ -147,12 +138,14 @@ impl Router { Ok(Router { workers, policy, - client, - worker_startup_timeout_secs, - worker_startup_check_interval_secs, - dp_aware, - api_key, - retry_config, + client: ctx.client.clone(), + worker_startup_timeout_secs: ctx.router_config.worker_startup_timeout_secs, + worker_startup_check_interval_secs: ctx + .router_config + .worker_startup_check_interval_secs, + dp_aware: ctx.router_config.dp_aware, + api_key: ctx.router_config.api_key.clone(), + retry_config: ctx.router_config.effective_retry_config(), circuit_breaker_config: core_cb_config, _worker_loads: worker_loads, _load_monitor_handle: load_monitor_handle, diff --git a/sgl-router/src/service_discovery.rs b/sgl-router/src/service_discovery.rs index 52cdfdea3..c27317f86 100644 --- a/sgl-router/src/service_discovery.rs +++ b/sgl-router/src/service_discovery.rs @@ -579,25 +579,27 @@ mod tests { // Helper to create a Router instance for testing event handlers async fn create_test_router() -> Arc { - use crate::config::PolicyConfig; + use crate::config::{PolicyConfig, RouterConfig}; + use crate::middleware::TokenBucket; use crate::policies::PolicyFactory; use crate::routers::http::router::Router; + use crate::server::AppContext; + + // Create a minimal RouterConfig for testing + let router_config = RouterConfig::default(); + + // Create AppContext with minimal components + let app_context = Arc::new(AppContext { + client: reqwest::Client::new(), + router_config, + rate_limiter: Arc::new(TokenBucket::new(1000, 1000)), + tokenizer: None, // HTTP mode doesn't need tokenizer + reasoning_parser_factory: None, // HTTP mode doesn't need reasoning parser + tool_parser_registry: None, // HTTP mode doesn't need tool parser + }); let policy = PolicyFactory::create_from_config(&PolicyConfig::Random); - let router = Router::new( - vec![], - policy, - reqwest::Client::new(), - 5, - 1, - false, - None, - crate::config::types::RetryConfig::default(), - crate::config::types::CircuitBreakerConfig::default(), - crate::config::types::HealthCheckConfig::default(), - ) - .await - .unwrap(); + let router = Router::new(vec![], policy, &app_context).await.unwrap(); Arc::new(router) as Arc }