From 780d6a22cd2e2ad6dd3ee6f92ed0da3d74c5c3e3 Mon Sep 17 00:00:00 2001 From: Simo Lin Date: Fri, 19 Sep 2025 00:47:56 -0400 Subject: [PATCH] [router] refactor worker to builder pattern 2/n (#10633) --- sgl-router/src/core/worker.rs | 121 ++++++++++++++++++---------------- 1 file changed, 66 insertions(+), 55 deletions(-) diff --git a/sgl-router/src/core/worker.rs b/sgl-router/src/core/worker.rs index e68ef6886..e5a339feb 100644 --- a/sgl-router/src/core/worker.rs +++ b/sgl-router/src/core/worker.rs @@ -1,4 +1,6 @@ use super::{CircuitBreaker, CircuitBreakerConfig, WorkerError, WorkerResult}; +use crate::core::CircuitState; +use crate::core::{BasicWorkerBuilder, DPAwareWorkerBuilder}; use crate::grpc::SglangSchedulerClient; use crate::metrics::RouterMetrics; use async_trait::async_trait; @@ -96,22 +98,22 @@ pub trait Worker: Send + Sync + fmt::Debug { if before != after { let from = match before { - crate::core::CircuitState::Closed => "closed", - crate::core::CircuitState::Open => "open", - crate::core::CircuitState::HalfOpen => "half_open", + CircuitState::Closed => "closed", + CircuitState::Open => "open", + CircuitState::HalfOpen => "half_open", }; let to = match after { - crate::core::CircuitState::Closed => "closed", - crate::core::CircuitState::Open => "open", - crate::core::CircuitState::HalfOpen => "half_open", + CircuitState::Closed => "closed", + CircuitState::Open => "open", + CircuitState::HalfOpen => "half_open", }; RouterMetrics::record_cb_state_transition(self.url(), from, to); } let state_code = match self.circuit_breaker().state() { - crate::core::CircuitState::Closed => 0u8, - crate::core::CircuitState::Open => 1u8, - crate::core::CircuitState::HalfOpen => 2u8, + CircuitState::Closed => 0u8, + CircuitState::Open => 1u8, + CircuitState::HalfOpen => 2u8, }; RouterMetrics::set_cb_state(self.url(), state_code); } @@ -706,6 +708,20 @@ impl Worker for DPAwareWorker { pub struct WorkerFactory; impl WorkerFactory { + /// Create a BasicWorkerBuilder for customizable worker creation + pub fn builder(url: impl Into) -> BasicWorkerBuilder { + BasicWorkerBuilder::new(url) + } + + /// Create a DPAwareWorkerBuilder for customizable DP-aware worker creation + pub fn dp_builder( + base_url: impl Into, + dp_rank: usize, + dp_size: usize, + ) -> DPAwareWorkerBuilder { + DPAwareWorkerBuilder::new(base_url, dp_rank, dp_size) + } + /// Create a regular worker pub fn create_regular(url: String) -> Box { Box::new(BasicWorker::new(url, WorkerType::Regular)) @@ -717,8 +733,9 @@ impl WorkerFactory { circuit_breaker_config: CircuitBreakerConfig, ) -> Box { Box::new( - BasicWorker::new(url, WorkerType::Regular) - .with_circuit_breaker_config(circuit_breaker_config), + BasicWorkerBuilder::new(url) + .circuit_breaker_config(circuit_breaker_config) + .build(), ) } @@ -737,8 +754,10 @@ impl WorkerFactory { circuit_breaker_config: CircuitBreakerConfig, ) -> Box { Box::new( - BasicWorker::new(url, WorkerType::Prefill { bootstrap_port }) - .with_circuit_breaker_config(circuit_breaker_config), + BasicWorkerBuilder::new(url) + .worker_type(WorkerType::Prefill { bootstrap_port }) + .circuit_breaker_config(circuit_breaker_config) + .build(), ) } @@ -753,8 +772,10 @@ impl WorkerFactory { circuit_breaker_config: CircuitBreakerConfig, ) -> Box { Box::new( - BasicWorker::new(url, WorkerType::Decode) - .with_circuit_breaker_config(circuit_breaker_config), + BasicWorkerBuilder::new(url) + .worker_type(WorkerType::Decode) + .circuit_breaker_config(circuit_breaker_config) + .build(), ) } @@ -800,8 +821,11 @@ impl WorkerFactory { circuit_breaker_config: CircuitBreakerConfig, ) -> Box { Box::new( - BasicWorker::with_connection_mode(url, worker_type, ConnectionMode::Grpc { port }) - .with_circuit_breaker_config(circuit_breaker_config), + BasicWorkerBuilder::new(url) + .worker_type(worker_type) + .connection_mode(ConnectionMode::Grpc { port }) + .circuit_breaker_config(circuit_breaker_config) + .build(), ) } @@ -811,13 +835,12 @@ impl WorkerFactory { labels: std::collections::HashMap, circuit_breaker_config: CircuitBreakerConfig, ) -> Box { - let mut worker = BasicWorker::new(url.clone(), WorkerType::Regular) - .with_circuit_breaker_config(circuit_breaker_config); - - // Add labels to metadata - worker.metadata.labels = labels; - - Box::new(worker) + Box::new( + BasicWorkerBuilder::new(url) + .labels(labels) + .circuit_breaker_config(circuit_breaker_config) + .build(), + ) } /// Create a prefill worker with labels @@ -827,13 +850,13 @@ impl WorkerFactory { labels: std::collections::HashMap, circuit_breaker_config: CircuitBreakerConfig, ) -> Box { - let mut worker = BasicWorker::new(url.clone(), WorkerType::Prefill { bootstrap_port }) - .with_circuit_breaker_config(circuit_breaker_config); - - // Add labels to metadata - worker.metadata.labels = labels; - - Box::new(worker) + Box::new( + BasicWorkerBuilder::new(url) + .worker_type(WorkerType::Prefill { bootstrap_port }) + .labels(labels) + .circuit_breaker_config(circuit_breaker_config) + .build(), + ) } /// Create a decode worker with labels @@ -842,13 +865,13 @@ impl WorkerFactory { labels: std::collections::HashMap, circuit_breaker_config: CircuitBreakerConfig, ) -> Box { - let mut worker = BasicWorker::new(url.clone(), WorkerType::Decode) - .with_circuit_breaker_config(circuit_breaker_config); - - // Add labels to metadata - worker.metadata.labels = labels; - - Box::new(worker) + Box::new( + BasicWorkerBuilder::new(url) + .worker_type(WorkerType::Decode) + .labels(labels) + .circuit_breaker_config(circuit_breaker_config) + .build(), + ) } /// Create a DP-aware worker of specified type @@ -1910,10 +1933,7 @@ mod tests { // Initial state should be available assert!(worker.is_available()); - assert_eq!( - worker.circuit_breaker().state(), - crate::core::CircuitState::Closed - ); + assert_eq!(worker.circuit_breaker().state(), CircuitState::Closed); // Record some failures worker.record_outcome(false); @@ -1935,7 +1955,7 @@ mod tests { #[test] fn test_worker_with_circuit_breaker_config() { - let config = crate::core::CircuitBreakerConfig { + let config = CircuitBreakerConfig { failure_threshold: 2, success_threshold: 1, timeout_duration: Duration::from_millis(100), @@ -1956,17 +1976,11 @@ mod tests { // Should be half-open assert!(worker.is_available()); - assert_eq!( - worker.circuit_breaker().state(), - crate::core::CircuitState::HalfOpen - ); + assert_eq!(worker.circuit_breaker().state(), CircuitState::HalfOpen); // Success should close it worker.record_outcome(true); - assert_eq!( - worker.circuit_breaker().state(), - crate::core::CircuitState::Closed - ); + assert_eq!(worker.circuit_breaker().state(), CircuitState::Closed); } #[test] @@ -1984,10 +1998,7 @@ mod tests { // Should not be available assert!(!dp_worker.is_available()); - assert_eq!( - dp_worker.circuit_breaker().state(), - crate::core::CircuitState::Open - ); + assert_eq!(dp_worker.circuit_breaker().state(), CircuitState::Open); } // ===== Integration tests =====