[router] Expose worker startup interval (#3019)
This commit is contained in:
@@ -34,6 +34,7 @@ class RouterArgs:
|
|||||||
# Routing policy
|
# Routing policy
|
||||||
policy: str = "cache_aware"
|
policy: str = "cache_aware"
|
||||||
worker_startup_timeout_secs: int = 300
|
worker_startup_timeout_secs: int = 300
|
||||||
|
worker_startup_check_interval: int = 10
|
||||||
cache_threshold: float = 0.5
|
cache_threshold: float = 0.5
|
||||||
balance_abs_threshold: int = 32
|
balance_abs_threshold: int = 32
|
||||||
balance_rel_threshold: float = 1.0001
|
balance_rel_threshold: float = 1.0001
|
||||||
@@ -94,6 +95,12 @@ class RouterArgs:
|
|||||||
default=RouterArgs.worker_startup_timeout_secs,
|
default=RouterArgs.worker_startup_timeout_secs,
|
||||||
help="Timeout in seconds for worker startup",
|
help="Timeout in seconds for worker startup",
|
||||||
)
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
f"--{prefix}worker-startup-check-interval",
|
||||||
|
type=int,
|
||||||
|
default=RouterArgs.worker_startup_check_interval,
|
||||||
|
help="Interval in seconds between checks for worker startup",
|
||||||
|
)
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
f"--{prefix}cache-threshold",
|
f"--{prefix}cache-threshold",
|
||||||
type=float,
|
type=float,
|
||||||
@@ -157,6 +164,9 @@ class RouterArgs:
|
|||||||
worker_startup_timeout_secs=getattr(
|
worker_startup_timeout_secs=getattr(
|
||||||
args, f"{prefix}worker_startup_timeout_secs"
|
args, f"{prefix}worker_startup_timeout_secs"
|
||||||
),
|
),
|
||||||
|
worker_startup_check_interval=getattr(
|
||||||
|
args, f"{prefix}worker_startup_check_interval"
|
||||||
|
),
|
||||||
cache_threshold=getattr(args, f"{prefix}cache_threshold"),
|
cache_threshold=getattr(args, f"{prefix}cache_threshold"),
|
||||||
balance_abs_threshold=getattr(args, f"{prefix}balance_abs_threshold"),
|
balance_abs_threshold=getattr(args, f"{prefix}balance_abs_threshold"),
|
||||||
balance_rel_threshold=getattr(args, f"{prefix}balance_rel_threshold"),
|
balance_rel_threshold=getattr(args, f"{prefix}balance_rel_threshold"),
|
||||||
@@ -202,6 +212,7 @@ def launch_router(args: argparse.Namespace) -> Optional[Router]:
|
|||||||
port=router_args.port,
|
port=router_args.port,
|
||||||
policy=policy_from_str(router_args.policy),
|
policy=policy_from_str(router_args.policy),
|
||||||
worker_startup_timeout_secs=router_args.worker_startup_timeout_secs,
|
worker_startup_timeout_secs=router_args.worker_startup_timeout_secs,
|
||||||
|
worker_startup_check_interval=router_args.worker_startup_check_interval,
|
||||||
cache_threshold=router_args.cache_threshold,
|
cache_threshold=router_args.cache_threshold,
|
||||||
balance_abs_threshold=router_args.balance_abs_threshold,
|
balance_abs_threshold=router_args.balance_abs_threshold,
|
||||||
balance_rel_threshold=router_args.balance_rel_threshold,
|
balance_rel_threshold=router_args.balance_rel_threshold,
|
||||||
|
|||||||
@@ -18,6 +18,7 @@ class Router:
|
|||||||
host: Host address to bind the router server. Default: '127.0.0.1'
|
host: Host address to bind the router server. Default: '127.0.0.1'
|
||||||
port: Port number to bind the router server. Default: 3001
|
port: Port number to bind the router server. Default: 3001
|
||||||
worker_startup_timeout_secs: Timeout in seconds for worker startup. Default: 300
|
worker_startup_timeout_secs: Timeout in seconds for worker startup. Default: 300
|
||||||
|
worker_startup_check_interval: Interval in seconds between checks for worker initialization. Default: 10
|
||||||
cache_threshold: Cache threshold (0.0-1.0) for cache-aware routing. Routes to cached worker
|
cache_threshold: Cache threshold (0.0-1.0) for cache-aware routing. Routes to cached worker
|
||||||
if the match rate exceeds threshold, otherwise routes to the worker with the smallest
|
if the match rate exceeds threshold, otherwise routes to the worker with the smallest
|
||||||
tree. Default: 0.5
|
tree. Default: 0.5
|
||||||
@@ -39,6 +40,7 @@ class Router:
|
|||||||
host: str = "127.0.0.1",
|
host: str = "127.0.0.1",
|
||||||
port: int = 3001,
|
port: int = 3001,
|
||||||
worker_startup_timeout_secs: int = 300,
|
worker_startup_timeout_secs: int = 300,
|
||||||
|
worker_startup_check_interval: int = 10,
|
||||||
cache_threshold: float = 0.50,
|
cache_threshold: float = 0.50,
|
||||||
balance_abs_threshold: int = 32,
|
balance_abs_threshold: int = 32,
|
||||||
balance_rel_threshold: float = 1.0001,
|
balance_rel_threshold: float = 1.0001,
|
||||||
@@ -53,6 +55,7 @@ class Router:
|
|||||||
host=host,
|
host=host,
|
||||||
port=port,
|
port=port,
|
||||||
worker_startup_timeout_secs=worker_startup_timeout_secs,
|
worker_startup_timeout_secs=worker_startup_timeout_secs,
|
||||||
|
worker_startup_check_interval=worker_startup_check_interval,
|
||||||
cache_threshold=cache_threshold,
|
cache_threshold=cache_threshold,
|
||||||
balance_abs_threshold=balance_abs_threshold,
|
balance_abs_threshold=balance_abs_threshold,
|
||||||
balance_rel_threshold=balance_rel_threshold,
|
balance_rel_threshold=balance_rel_threshold,
|
||||||
|
|||||||
@@ -29,6 +29,7 @@ class TestLaunchRouter(unittest.TestCase):
|
|||||||
port=30000,
|
port=30000,
|
||||||
policy="cache_aware",
|
policy="cache_aware",
|
||||||
worker_startup_timeout_secs=600,
|
worker_startup_timeout_secs=600,
|
||||||
|
worker_startup_check_interval=10,
|
||||||
cache_threshold=0.5,
|
cache_threshold=0.5,
|
||||||
balance_abs_threshold=32,
|
balance_abs_threshold=32,
|
||||||
balance_rel_threshold=1.0001,
|
balance_rel_threshold=1.0001,
|
||||||
|
|||||||
@@ -18,6 +18,7 @@ struct Router {
|
|||||||
worker_urls: Vec<String>,
|
worker_urls: Vec<String>,
|
||||||
policy: PolicyType,
|
policy: PolicyType,
|
||||||
worker_startup_timeout_secs: u64,
|
worker_startup_timeout_secs: u64,
|
||||||
|
worker_startup_check_interval: u64,
|
||||||
cache_threshold: f32,
|
cache_threshold: f32,
|
||||||
balance_abs_threshold: usize,
|
balance_abs_threshold: usize,
|
||||||
balance_rel_threshold: f32,
|
balance_rel_threshold: f32,
|
||||||
@@ -36,6 +37,7 @@ impl Router {
|
|||||||
host = String::from("127.0.0.1"),
|
host = String::from("127.0.0.1"),
|
||||||
port = 3001,
|
port = 3001,
|
||||||
worker_startup_timeout_secs = 300,
|
worker_startup_timeout_secs = 300,
|
||||||
|
worker_startup_check_interval = 10,
|
||||||
cache_threshold = 0.50,
|
cache_threshold = 0.50,
|
||||||
balance_abs_threshold = 32,
|
balance_abs_threshold = 32,
|
||||||
balance_rel_threshold = 1.0001,
|
balance_rel_threshold = 1.0001,
|
||||||
@@ -50,6 +52,7 @@ impl Router {
|
|||||||
host: String,
|
host: String,
|
||||||
port: u16,
|
port: u16,
|
||||||
worker_startup_timeout_secs: u64,
|
worker_startup_timeout_secs: u64,
|
||||||
|
worker_startup_check_interval: u64,
|
||||||
cache_threshold: f32,
|
cache_threshold: f32,
|
||||||
balance_abs_threshold: usize,
|
balance_abs_threshold: usize,
|
||||||
balance_rel_threshold: f32,
|
balance_rel_threshold: f32,
|
||||||
@@ -64,6 +67,7 @@ impl Router {
|
|||||||
worker_urls,
|
worker_urls,
|
||||||
policy,
|
policy,
|
||||||
worker_startup_timeout_secs,
|
worker_startup_timeout_secs,
|
||||||
|
worker_startup_check_interval,
|
||||||
cache_threshold,
|
cache_threshold,
|
||||||
balance_abs_threshold,
|
balance_abs_threshold,
|
||||||
balance_rel_threshold,
|
balance_rel_threshold,
|
||||||
@@ -78,12 +82,15 @@ impl Router {
|
|||||||
let policy_config = match &self.policy {
|
let policy_config = match &self.policy {
|
||||||
PolicyType::Random => router::PolicyConfig::RandomConfig {
|
PolicyType::Random => router::PolicyConfig::RandomConfig {
|
||||||
timeout_secs: self.worker_startup_timeout_secs,
|
timeout_secs: self.worker_startup_timeout_secs,
|
||||||
|
interval_secs: self.worker_startup_check_interval,
|
||||||
},
|
},
|
||||||
PolicyType::RoundRobin => router::PolicyConfig::RoundRobinConfig {
|
PolicyType::RoundRobin => router::PolicyConfig::RoundRobinConfig {
|
||||||
timeout_secs: self.worker_startup_timeout_secs,
|
timeout_secs: self.worker_startup_timeout_secs,
|
||||||
|
interval_secs: self.worker_startup_check_interval,
|
||||||
},
|
},
|
||||||
PolicyType::CacheAware => router::PolicyConfig::CacheAwareConfig {
|
PolicyType::CacheAware => router::PolicyConfig::CacheAwareConfig {
|
||||||
timeout_secs: self.worker_startup_timeout_secs,
|
timeout_secs: self.worker_startup_timeout_secs,
|
||||||
|
interval_secs: self.worker_startup_check_interval,
|
||||||
cache_threshold: self.cache_threshold,
|
cache_threshold: self.cache_threshold,
|
||||||
balance_abs_threshold: self.balance_abs_threshold,
|
balance_abs_threshold: self.balance_abs_threshold,
|
||||||
balance_rel_threshold: self.balance_rel_threshold,
|
balance_rel_threshold: self.balance_rel_threshold,
|
||||||
|
|||||||
@@ -18,10 +18,12 @@ pub enum Router {
|
|||||||
worker_urls: Arc<RwLock<Vec<String>>>,
|
worker_urls: Arc<RwLock<Vec<String>>>,
|
||||||
current_index: AtomicUsize,
|
current_index: AtomicUsize,
|
||||||
timeout_secs: u64,
|
timeout_secs: u64,
|
||||||
|
interval_secs: u64,
|
||||||
},
|
},
|
||||||
Random {
|
Random {
|
||||||
worker_urls: Arc<RwLock<Vec<String>>>,
|
worker_urls: Arc<RwLock<Vec<String>>>,
|
||||||
timeout_secs: u64,
|
timeout_secs: u64,
|
||||||
|
interval_secs: u64,
|
||||||
},
|
},
|
||||||
CacheAware {
|
CacheAware {
|
||||||
/*
|
/*
|
||||||
@@ -92,6 +94,7 @@ pub enum Router {
|
|||||||
balance_abs_threshold: usize,
|
balance_abs_threshold: usize,
|
||||||
balance_rel_threshold: f32,
|
balance_rel_threshold: f32,
|
||||||
timeout_secs: u64,
|
timeout_secs: u64,
|
||||||
|
interval_secs: u64,
|
||||||
_eviction_thread: Option<thread::JoinHandle<()>>,
|
_eviction_thread: Option<thread::JoinHandle<()>>,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
@@ -100,9 +103,11 @@ pub enum Router {
|
|||||||
pub enum PolicyConfig {
|
pub enum PolicyConfig {
|
||||||
RandomConfig {
|
RandomConfig {
|
||||||
timeout_secs: u64,
|
timeout_secs: u64,
|
||||||
|
interval_secs: u64,
|
||||||
},
|
},
|
||||||
RoundRobinConfig {
|
RoundRobinConfig {
|
||||||
timeout_secs: u64,
|
timeout_secs: u64,
|
||||||
|
interval_secs: u64,
|
||||||
},
|
},
|
||||||
CacheAwareConfig {
|
CacheAwareConfig {
|
||||||
cache_threshold: f32,
|
cache_threshold: f32,
|
||||||
@@ -111,31 +116,50 @@ pub enum PolicyConfig {
|
|||||||
eviction_interval_secs: u64,
|
eviction_interval_secs: u64,
|
||||||
max_tree_size: usize,
|
max_tree_size: usize,
|
||||||
timeout_secs: u64,
|
timeout_secs: u64,
|
||||||
|
interval_secs: u64,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Router {
|
impl Router {
|
||||||
pub fn new(worker_urls: Vec<String>, policy_config: PolicyConfig) -> Result<Self, String> {
|
pub fn new(worker_urls: Vec<String>, policy_config: PolicyConfig) -> Result<Self, String> {
|
||||||
// Get timeout from policy config
|
// Get timeout and interval from policy config
|
||||||
let timeout_secs = match &policy_config {
|
let (timeout_secs, interval_secs) = match &policy_config {
|
||||||
PolicyConfig::RandomConfig { timeout_secs } => *timeout_secs,
|
PolicyConfig::RandomConfig {
|
||||||
PolicyConfig::RoundRobinConfig { timeout_secs } => *timeout_secs,
|
timeout_secs,
|
||||||
PolicyConfig::CacheAwareConfig { timeout_secs, .. } => *timeout_secs,
|
interval_secs,
|
||||||
|
} => (*timeout_secs, *interval_secs),
|
||||||
|
PolicyConfig::RoundRobinConfig {
|
||||||
|
timeout_secs,
|
||||||
|
interval_secs,
|
||||||
|
} => (*timeout_secs, *interval_secs),
|
||||||
|
PolicyConfig::CacheAwareConfig {
|
||||||
|
timeout_secs,
|
||||||
|
interval_secs,
|
||||||
|
..
|
||||||
|
} => (*timeout_secs, *interval_secs),
|
||||||
};
|
};
|
||||||
|
|
||||||
// Wait until all workers are healthy
|
// Wait until all workers are healthy
|
||||||
Self::wait_for_healthy_workers(&worker_urls, timeout_secs, 10)?;
|
Self::wait_for_healthy_workers(&worker_urls, timeout_secs, interval_secs)?;
|
||||||
|
|
||||||
// Create router based on policy...
|
// Create router based on policy...
|
||||||
Ok(match policy_config {
|
Ok(match policy_config {
|
||||||
PolicyConfig::RandomConfig { timeout_secs } => Router::Random {
|
PolicyConfig::RandomConfig {
|
||||||
|
timeout_secs,
|
||||||
|
interval_secs,
|
||||||
|
} => Router::Random {
|
||||||
worker_urls: Arc::new(RwLock::new(worker_urls)),
|
worker_urls: Arc::new(RwLock::new(worker_urls)),
|
||||||
timeout_secs,
|
timeout_secs,
|
||||||
|
interval_secs,
|
||||||
},
|
},
|
||||||
PolicyConfig::RoundRobinConfig { timeout_secs } => Router::RoundRobin {
|
PolicyConfig::RoundRobinConfig {
|
||||||
|
timeout_secs,
|
||||||
|
interval_secs,
|
||||||
|
} => Router::RoundRobin {
|
||||||
worker_urls: Arc::new(RwLock::new(worker_urls)),
|
worker_urls: Arc::new(RwLock::new(worker_urls)),
|
||||||
current_index: std::sync::atomic::AtomicUsize::new(0),
|
current_index: std::sync::atomic::AtomicUsize::new(0),
|
||||||
timeout_secs,
|
timeout_secs,
|
||||||
|
interval_secs,
|
||||||
},
|
},
|
||||||
PolicyConfig::CacheAwareConfig {
|
PolicyConfig::CacheAwareConfig {
|
||||||
cache_threshold,
|
cache_threshold,
|
||||||
@@ -144,6 +168,7 @@ impl Router {
|
|||||||
eviction_interval_secs,
|
eviction_interval_secs,
|
||||||
max_tree_size,
|
max_tree_size,
|
||||||
timeout_secs,
|
timeout_secs,
|
||||||
|
interval_secs,
|
||||||
} => {
|
} => {
|
||||||
let mut running_queue = HashMap::new();
|
let mut running_queue = HashMap::new();
|
||||||
for url in &worker_urls {
|
for url in &worker_urls {
|
||||||
@@ -195,6 +220,7 @@ impl Router {
|
|||||||
balance_abs_threshold,
|
balance_abs_threshold,
|
||||||
balance_rel_threshold,
|
balance_rel_threshold,
|
||||||
timeout_secs,
|
timeout_secs,
|
||||||
|
interval_secs,
|
||||||
_eviction_thread: Some(eviction_thread),
|
_eviction_thread: Some(eviction_thread),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -594,11 +620,22 @@ impl Router {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub async fn add_worker(&self, worker_url: &str) -> Result<String, String> {
|
pub async fn add_worker(&self, worker_url: &str) -> Result<String, String> {
|
||||||
let interval_secs = 10; // check every 10 seconds
|
let (timeout_secs, interval_secs) = match self {
|
||||||
let timeout_secs = match self {
|
Router::Random {
|
||||||
Router::Random { timeout_secs, .. } => *timeout_secs,
|
timeout_secs,
|
||||||
Router::RoundRobin { timeout_secs, .. } => *timeout_secs,
|
interval_secs,
|
||||||
Router::CacheAware { timeout_secs, .. } => *timeout_secs,
|
..
|
||||||
|
} => (*timeout_secs, *interval_secs),
|
||||||
|
Router::RoundRobin {
|
||||||
|
timeout_secs,
|
||||||
|
interval_secs,
|
||||||
|
..
|
||||||
|
} => (*timeout_secs, *interval_secs),
|
||||||
|
Router::CacheAware {
|
||||||
|
timeout_secs,
|
||||||
|
interval_secs,
|
||||||
|
..
|
||||||
|
} => (*timeout_secs, *interval_secs),
|
||||||
};
|
};
|
||||||
|
|
||||||
let start_time = std::time::Instant::now();
|
let start_time = std::time::Instant::now();
|
||||||
|
|||||||
Reference in New Issue
Block a user