From ddab4fc7c70dd6fa1154b0e5d39255078fbbbad6 Mon Sep 17 00:00:00 2001 From: Simo Lin Date: Tue, 23 Sep 2025 11:53:49 -0400 Subject: [PATCH] [router] fix cache aware routing strategy and lock contention (#10773) --- .github/workflows/pr-test-pd-router.yml | 2 +- sgl-router/src/policies/cache_aware.rs | 163 ++++++++++-------------- 2 files changed, 70 insertions(+), 95 deletions(-) diff --git a/.github/workflows/pr-test-pd-router.yml b/.github/workflows/pr-test-pd-router.yml index 45e210b68..68900c94f 100644 --- a/.github/workflows/pr-test-pd-router.yml +++ b/.github/workflows/pr-test-pd-router.yml @@ -219,7 +219,7 @@ jobs: --decode http://127.0.0.7:30007 \ --decode http://127.0.0.8:30008 \ --host 127.0.0.9 \ - --log-level warning \ + --log-level warn \ --port 8000 & ROUTER_PID=$! diff --git a/sgl-router/src/policies/cache_aware.rs b/sgl-router/src/policies/cache_aware.rs index eeeb7b36b..75f03ed04 100644 --- a/sgl-router/src/policies/cache_aware.rs +++ b/sgl-router/src/policies/cache_aware.rs @@ -129,11 +129,14 @@ impl CacheAwarePolicy { // Use "default" for unknown/empty model_ids for backward compatibility let model_id = worker.model_id(); let tree_key = if model_id.is_empty() || model_id == "unknown" { - "default".to_string() + "default" } else { - model_id.to_string() + model_id }; - model_workers.entry(tree_key).or_default().push(worker); + model_workers + .entry(tree_key.to_string()) + .or_default() + .push(worker); } // Initialize tree for each model @@ -153,11 +156,11 @@ impl CacheAwarePolicy { // use a default tree. This preserves existing behavior for single-model routers. let model_id = worker.model_id(); let tree_key = if model_id.is_empty() || model_id == "unknown" { - "default".to_string() + "default" } else { - model_id.to_string() + model_id }; - let tree = trees.entry(tree_key).or_insert_with(Tree::new); + let tree = trees.entry(tree_key.to_string()).or_insert_with(Tree::new); tree.insert("", worker.url()); } } @@ -176,11 +179,11 @@ impl CacheAwarePolicy { // Use same logic as add_worker for consistency let model_id = worker.model_id(); let tree_key = if model_id.is_empty() || model_id == "unknown" { - "default".to_string() + "default" } else { - model_id.to_string() + model_id }; - if let Some(tree) = trees.get_mut(&tree_key) { + if let Some(tree) = trees.get_mut(tree_key) { tree.remove_tenant(worker.url()); } } @@ -222,17 +225,14 @@ impl LoadBalancingPolicy for CacheAwarePolicy { return None; } - // Group workers by model (using "default" for unknown/empty model_ids) - let mut model_workers: HashMap> = HashMap::new(); - for idx in &healthy_indices { - let model_id = workers[*idx].model_id(); - let tree_key = if model_id.is_empty() || model_id == "unknown" { - "default".to_string() - } else { - model_id.to_string() - }; - model_workers.entry(tree_key).or_default().push(*idx); - } + // Determine the model for this set of workers (router pre-filters by model) + // All workers should be from the same model + let first_model = workers[healthy_indices[0]].model_id(); + let model_id = if first_model.is_empty() || first_model == "unknown" { + "default" + } else { + first_model + }; // Get current load statistics let loads: Vec = workers.iter().map(|w| w.load()).collect(); @@ -267,13 +267,18 @@ impl LoadBalancingPolicy for CacheAwarePolicy { // Even in imbalanced mode, update the tree to maintain cache state if let Some(text) = request_text { if let Ok(mut trees) = self.trees.lock() { - let model_id = workers[min_load_idx].model_id(); - let tree_key = if model_id.is_empty() || model_id == "unknown" { - "default".to_string() + // Avoid allocation if tree already exists + let tree = if let Some(tree) = trees.get_mut(model_id) { + tree } else { - model_id.to_string() + // Create new tree and initialize with all workers + let new_tree = Tree::new(); + // Initialize with all healthy workers like OLD version does + for &idx in &healthy_indices { + new_tree.insert("", workers[idx].url()); + } + trees.entry(model_id.to_string()).or_insert(new_tree) }; - let tree = trees.entry(tree_key).or_insert_with(Tree::new); tree.insert(text, workers[min_load_idx].url()); } } @@ -290,84 +295,54 @@ impl LoadBalancingPolicy for CacheAwarePolicy { let text = request_text.unwrap_or(""); if let Ok(mut trees) = self.trees.lock() { - let mut best_match_idx: Option = None; - let mut best_match_rate: f32 = 0.0; - - // Find best match across all models - for (model_id, worker_indices) in &model_workers { - let tree = trees.entry(model_id.clone()).or_insert_with(Tree::new); - - let (matched_text, matched_worker) = tree.prefix_match(text); - let match_rate = if text.is_empty() { - 0.0 - } else { - matched_text.chars().count() as f32 / text.chars().count() as f32 - }; - - // Check if this model has the best match - if match_rate > best_match_rate { - // Find the worker index for this URL - if let Some(idx) = worker_indices - .iter() - .find(|&&idx| workers[idx].url() == matched_worker) - { - best_match_idx = Some(*idx); - best_match_rate = match_rate; - } + // Avoid allocation if tree already exists + let tree = if let Some(tree) = trees.get_mut(model_id) { + tree + } else { + // Create new tree and initialize with all workers + let new_tree = Tree::new(); + // Initialize with all healthy workers like OLD version does + for &idx in &healthy_indices { + new_tree.insert("", workers[idx].url()); } - } + trees.entry(model_id.to_string()).or_insert(new_tree) + }; + let (matched_text, matched_worker) = tree.prefix_match(text); + let match_rate = if text.is_empty() { + 0.0 + } else { + matched_text.chars().count() as f32 / text.chars().count() as f32 + }; - // Select worker based on cache threshold - let selected_idx = if let (Some(idx), true) = ( - best_match_idx, - best_match_rate > self.config.cache_threshold, - ) { + let selected_url = if match_rate > self.config.cache_threshold { RouterMetrics::record_cache_hit(); - idx + matched_worker.to_string() } else { RouterMetrics::record_cache_miss(); - - // Find model with smallest tree (most cache capacity) - let mut smallest_tree_model = String::new(); - let mut smallest_tree_size = usize::MAX; - - for model_id in model_workers.keys() { - let tree = trees.entry(model_id.clone()).or_insert_with(Tree::new); - let size = tree.get_used_size_per_tenant().values().sum::(); - if size < smallest_tree_size { - smallest_tree_size = size; - smallest_tree_model = model_id.clone(); - } - } - - // Select least loaded worker from model with most cache capacity - if let Some(worker_indices) = model_workers.get(&smallest_tree_model) { - worker_indices - .iter() - .min_by_key(|&&idx| workers[idx].load()) - .copied() - .unwrap_or(healthy_indices[0]) - } else { - healthy_indices[0] - } + tree.get_smallest_tenant() }; - // Update the tree with this request - let model_id = workers[selected_idx].model_id(); - let tree_key = if model_id.is_empty() || model_id == "unknown" { - "default".to_string() + // Find the index of the selected worker + if let Some(selected_idx) = workers.iter().position(|w| w.url() == selected_url) { + // Only proceed if the worker is healthy - use direct check like OLD version + if workers[selected_idx].is_healthy() { + // Update the tree with this request + tree.insert(text, &selected_url); + + // Increment processed counter + workers[selected_idx].increment_processed(); + RouterMetrics::record_processed_request(&selected_url); + + return Some(selected_idx); + } } else { - model_id.to_string() - }; - let tree = trees.entry(tree_key).or_insert_with(Tree::new); - tree.insert(text, workers[selected_idx].url()); + // Selected worker no longer exists, remove it from tree + tree.remove_tenant(&selected_url); + debug!("Removed stale worker {} from cache tree", selected_url); + } - // Increment processed counter - workers[selected_idx].increment_processed(); - RouterMetrics::record_processed_request(workers[selected_idx].url()); - RouterMetrics::record_policy_decision(self.name(), workers[selected_idx].url()); - - return Some(selected_idx); + // Fallback to first healthy worker + return healthy_indices.first().copied(); } // Fallback to first healthy worker if tree operations fail