[router] clean up dependency injector to use ctx (#10000)

This commit is contained in:
Simo Lin
2025-09-04 00:35:51 -04:00
committed by GitHub
parent d966b902af
commit 4f8a982d52
6 changed files with 147 additions and 235 deletions

View File

@@ -1,10 +1,7 @@
// PD (Prefill-Decode) Router Implementation
// This module handles routing for disaggregated prefill-decode systems
use super::pd_types::{api_path, PDRouterError};
use crate::config::types::{
CircuitBreakerConfig as ConfigCircuitBreakerConfig,
HealthCheckConfig as ConfigHealthCheckConfig, RetryConfig,
};
use crate::config::types::RetryConfig;
use crate::core::{
is_retryable_status, BasicWorker, CircuitBreakerConfig, HealthChecker, HealthConfig,
RetryExecutor, Worker, WorkerFactory, WorkerLoadGuard, WorkerType,
@@ -375,15 +372,10 @@ impl PDRouter {
decode_urls: Vec<String>,
prefill_policy: Arc<dyn LoadBalancingPolicy>,
decode_policy: Arc<dyn LoadBalancingPolicy>,
client: Client,
prefill_request_timeout_secs: u64,
worker_startup_timeout_secs: u64,
worker_startup_check_interval_secs: u64,
retry_config: RetryConfig,
circuit_breaker_config: ConfigCircuitBreakerConfig,
health_check_config: ConfigHealthCheckConfig,
ctx: &Arc<crate::server::AppContext>,
) -> Result<Self, String> {
// Convert config CircuitBreakerConfig to core CircuitBreakerConfig
let circuit_breaker_config = ctx.router_config.effective_circuit_breaker_config();
let core_cb_config = CircuitBreakerConfig {
failure_threshold: circuit_breaker_config.failure_threshold,
success_threshold: circuit_breaker_config.success_threshold,
@@ -403,11 +395,11 @@ impl PDRouter {
)
.with_circuit_breaker_config(core_cb_config.clone())
.with_health_config(HealthConfig {
timeout_secs: health_check_config.timeout_secs,
check_interval_secs: health_check_config.check_interval_secs,
endpoint: health_check_config.endpoint.clone(),
failure_threshold: health_check_config.failure_threshold,
success_threshold: health_check_config.success_threshold,
timeout_secs: ctx.router_config.health_check.timeout_secs,
check_interval_secs: ctx.router_config.health_check.check_interval_secs,
endpoint: ctx.router_config.health_check.endpoint.clone(),
failure_threshold: ctx.router_config.health_check.failure_threshold,
success_threshold: ctx.router_config.health_check.success_threshold,
});
Box::new(worker) as Box<dyn Worker>
})
@@ -419,11 +411,11 @@ impl PDRouter {
let worker = BasicWorker::new(url, WorkerType::Decode)
.with_circuit_breaker_config(core_cb_config.clone())
.with_health_config(HealthConfig {
timeout_secs: health_check_config.timeout_secs,
check_interval_secs: health_check_config.check_interval_secs,
endpoint: health_check_config.endpoint.clone(),
failure_threshold: health_check_config.failure_threshold,
success_threshold: health_check_config.success_threshold,
timeout_secs: ctx.router_config.health_check.timeout_secs,
check_interval_secs: ctx.router_config.health_check.check_interval_secs,
endpoint: ctx.router_config.health_check.endpoint.clone(),
failure_threshold: ctx.router_config.health_check.failure_threshold,
success_threshold: ctx.router_config.health_check.success_threshold,
});
Box::new(worker) as Box<dyn Worker>
})
@@ -438,8 +430,8 @@ impl PDRouter {
if !all_urls.is_empty() {
crate::routers::http::router::Router::wait_for_healthy_workers(
&all_urls,
worker_startup_timeout_secs,
worker_startup_check_interval_secs,
ctx.router_config.worker_startup_timeout_secs,
ctx.router_config.worker_startup_check_interval_secs,
)
.await?;
}
@@ -466,8 +458,8 @@ impl PDRouter {
let load_monitor_handle =
if prefill_policy.name() == "power_of_two" || decode_policy.name() == "power_of_two" {
let monitor_urls = all_urls.clone();
let monitor_interval = worker_startup_check_interval_secs;
let monitor_client = client.clone();
let monitor_interval = ctx.router_config.worker_startup_check_interval_secs;
let monitor_client = ctx.client.clone();
let prefill_policy_clone = Arc::clone(&prefill_policy);
let decode_policy_clone = Arc::clone(&decode_policy);
@@ -492,11 +484,11 @@ impl PDRouter {
// Start health checkers for both worker pools
let prefill_health_checker = crate::core::start_health_checker(
Arc::clone(&prefill_workers),
health_check_config.check_interval_secs,
ctx.router_config.health_check.check_interval_secs,
);
let decode_health_checker = crate::core::start_health_checker(
Arc::clone(&decode_workers),
health_check_config.check_interval_secs,
ctx.router_config.health_check.check_interval_secs,
);
// Build a dedicated prefill client for fire-and-forget semantics
@@ -504,7 +496,7 @@ impl PDRouter {
.pool_max_idle_per_host(0)
.http1_only()
.connect_timeout(Duration::from_millis(300))
.timeout(Duration::from_secs(prefill_request_timeout_secs))
.timeout(Duration::from_secs(ctx.router_config.request_timeout_secs))
.build()
.map_err(|e| format!("Failed to build prefill client: {}", e))?;
@@ -582,14 +574,16 @@ impl PDRouter {
decode_workers,
prefill_policy,
decode_policy,
worker_startup_timeout_secs,
worker_startup_check_interval_secs,
worker_startup_timeout_secs: ctx.router_config.worker_startup_timeout_secs,
worker_startup_check_interval_secs: ctx
.router_config
.worker_startup_check_interval_secs,
worker_loads,
load_monitor_handle,
client,
client: ctx.client.clone(),
prefill_client,
prefill_drain_tx,
retry_config,
retry_config: ctx.router_config.effective_retry_config(),
circuit_breaker_config: core_cb_config,
_prefill_health_checker: Some(prefill_health_checker),
_decode_health_checker: Some(decode_health_checker),

View File

@@ -1,7 +1,4 @@
use crate::config::types::{
CircuitBreakerConfig as ConfigCircuitBreakerConfig,
HealthCheckConfig as ConfigHealthCheckConfig, RetryConfig,
};
use crate::config::types::RetryConfig;
use crate::core::{
is_retryable_status, BasicWorker, CircuitBreakerConfig, HealthChecker, HealthConfig,
RetryExecutor, Worker, WorkerFactory, WorkerType,
@@ -51,14 +48,7 @@ impl Router {
pub async fn new(
worker_urls: Vec<String>,
policy: Arc<dyn LoadBalancingPolicy>,
client: Client,
worker_startup_timeout_secs: u64,
worker_startup_check_interval_secs: u64,
dp_aware: bool,
api_key: Option<String>,
retry_config: RetryConfig,
circuit_breaker_config: ConfigCircuitBreakerConfig,
health_check_config: ConfigHealthCheckConfig,
ctx: &Arc<crate::server::AppContext>,
) -> Result<Self, String> {
// Update active workers gauge
RouterMetrics::set_active_workers(worker_urls.len());
@@ -67,21 +57,22 @@ impl Router {
if !worker_urls.is_empty() {
Self::wait_for_healthy_workers(
&worker_urls,
worker_startup_timeout_secs,
worker_startup_check_interval_secs,
ctx.router_config.worker_startup_timeout_secs,
ctx.router_config.worker_startup_check_interval_secs,
)
.await?;
}
let worker_urls = if dp_aware {
let worker_urls = if ctx.router_config.dp_aware {
// worker address now in the format of "http://host:port@dp_rank"
Self::get_dp_aware_workers(&worker_urls, &api_key)
Self::get_dp_aware_workers(&worker_urls, &ctx.router_config.api_key)
.map_err(|e| format!("Failed to get dp-aware workers: {}", e))?
} else {
worker_urls
};
// Convert config CircuitBreakerConfig to core CircuitBreakerConfig
let circuit_breaker_config = ctx.router_config.effective_circuit_breaker_config();
let core_cb_config = CircuitBreakerConfig {
failure_threshold: circuit_breaker_config.failure_threshold,
success_threshold: circuit_breaker_config.success_threshold,
@@ -96,11 +87,11 @@ impl Router {
let worker = BasicWorker::new(url.clone(), WorkerType::Regular)
.with_circuit_breaker_config(core_cb_config.clone())
.with_health_config(HealthConfig {
timeout_secs: health_check_config.timeout_secs,
check_interval_secs: health_check_config.check_interval_secs,
endpoint: health_check_config.endpoint.clone(),
failure_threshold: health_check_config.failure_threshold,
success_threshold: health_check_config.success_threshold,
timeout_secs: ctx.router_config.health_check.timeout_secs,
check_interval_secs: ctx.router_config.health_check.check_interval_secs,
endpoint: ctx.router_config.health_check.endpoint.clone(),
failure_threshold: ctx.router_config.health_check.failure_threshold,
success_threshold: ctx.router_config.health_check.success_threshold,
});
Box::new(worker) as Box<dyn Worker>
})
@@ -117,7 +108,7 @@ impl Router {
let workers = Arc::new(RwLock::new(workers));
let health_checker = crate::core::start_health_checker(
Arc::clone(&workers),
worker_startup_check_interval_secs,
ctx.router_config.worker_startup_check_interval_secs,
);
// Setup load monitoring for PowerOfTwo policy
@@ -126,9 +117,9 @@ impl Router {
let load_monitor_handle = if policy.name() == "power_of_two" {
let monitor_urls = worker_urls.clone();
let monitor_interval = worker_startup_check_interval_secs;
let monitor_interval = ctx.router_config.worker_startup_check_interval_secs;
let policy_clone = Arc::clone(&policy);
let client_clone = client.clone();
let client_clone = ctx.client.clone();
Some(Arc::new(tokio::spawn(async move {
Self::monitor_worker_loads(
@@ -147,12 +138,14 @@ impl Router {
Ok(Router {
workers,
policy,
client,
worker_startup_timeout_secs,
worker_startup_check_interval_secs,
dp_aware,
api_key,
retry_config,
client: ctx.client.clone(),
worker_startup_timeout_secs: ctx.router_config.worker_startup_timeout_secs,
worker_startup_check_interval_secs: ctx
.router_config
.worker_startup_check_interval_secs,
dp_aware: ctx.router_config.dp_aware,
api_key: ctx.router_config.api_key.clone(),
retry_config: ctx.router_config.effective_retry_config(),
circuit_breaker_config: core_cb_config,
_worker_loads: worker_loads,
_load_monitor_handle: load_monitor_handle,