[router] fix cache aware routing strategy and lock contention (#10773)
This commit is contained in:
2
.github/workflows/pr-test-pd-router.yml
vendored
2
.github/workflows/pr-test-pd-router.yml
vendored
@@ -219,7 +219,7 @@ jobs:
|
||||
--decode http://127.0.0.7:30007 \
|
||||
--decode http://127.0.0.8:30008 \
|
||||
--host 127.0.0.9 \
|
||||
--log-level warning \
|
||||
--log-level warn \
|
||||
--port 8000 &
|
||||
ROUTER_PID=$!
|
||||
|
||||
|
||||
@@ -129,11 +129,14 @@ impl CacheAwarePolicy {
|
||||
// Use "default" for unknown/empty model_ids for backward compatibility
|
||||
let model_id = worker.model_id();
|
||||
let tree_key = if model_id.is_empty() || model_id == "unknown" {
|
||||
"default".to_string()
|
||||
"default"
|
||||
} else {
|
||||
model_id.to_string()
|
||||
model_id
|
||||
};
|
||||
model_workers.entry(tree_key).or_default().push(worker);
|
||||
model_workers
|
||||
.entry(tree_key.to_string())
|
||||
.or_default()
|
||||
.push(worker);
|
||||
}
|
||||
|
||||
// Initialize tree for each model
|
||||
@@ -153,11 +156,11 @@ impl CacheAwarePolicy {
|
||||
// use a default tree. This preserves existing behavior for single-model routers.
|
||||
let model_id = worker.model_id();
|
||||
let tree_key = if model_id.is_empty() || model_id == "unknown" {
|
||||
"default".to_string()
|
||||
"default"
|
||||
} else {
|
||||
model_id.to_string()
|
||||
model_id
|
||||
};
|
||||
let tree = trees.entry(tree_key).or_insert_with(Tree::new);
|
||||
let tree = trees.entry(tree_key.to_string()).or_insert_with(Tree::new);
|
||||
tree.insert("", worker.url());
|
||||
}
|
||||
}
|
||||
@@ -176,11 +179,11 @@ impl CacheAwarePolicy {
|
||||
// Use same logic as add_worker for consistency
|
||||
let model_id = worker.model_id();
|
||||
let tree_key = if model_id.is_empty() || model_id == "unknown" {
|
||||
"default".to_string()
|
||||
"default"
|
||||
} else {
|
||||
model_id.to_string()
|
||||
model_id
|
||||
};
|
||||
if let Some(tree) = trees.get_mut(&tree_key) {
|
||||
if let Some(tree) = trees.get_mut(tree_key) {
|
||||
tree.remove_tenant(worker.url());
|
||||
}
|
||||
}
|
||||
@@ -222,17 +225,14 @@ impl LoadBalancingPolicy for CacheAwarePolicy {
|
||||
return None;
|
||||
}
|
||||
|
||||
// Group workers by model (using "default" for unknown/empty model_ids)
|
||||
let mut model_workers: HashMap<String, Vec<usize>> = HashMap::new();
|
||||
for idx in &healthy_indices {
|
||||
let model_id = workers[*idx].model_id();
|
||||
let tree_key = if model_id.is_empty() || model_id == "unknown" {
|
||||
"default".to_string()
|
||||
} else {
|
||||
model_id.to_string()
|
||||
};
|
||||
model_workers.entry(tree_key).or_default().push(*idx);
|
||||
}
|
||||
// Determine the model for this set of workers (router pre-filters by model)
|
||||
// All workers should be from the same model
|
||||
let first_model = workers[healthy_indices[0]].model_id();
|
||||
let model_id = if first_model.is_empty() || first_model == "unknown" {
|
||||
"default"
|
||||
} else {
|
||||
first_model
|
||||
};
|
||||
|
||||
// Get current load statistics
|
||||
let loads: Vec<usize> = workers.iter().map(|w| w.load()).collect();
|
||||
@@ -267,13 +267,18 @@ impl LoadBalancingPolicy for CacheAwarePolicy {
|
||||
// Even in imbalanced mode, update the tree to maintain cache state
|
||||
if let Some(text) = request_text {
|
||||
if let Ok(mut trees) = self.trees.lock() {
|
||||
let model_id = workers[min_load_idx].model_id();
|
||||
let tree_key = if model_id.is_empty() || model_id == "unknown" {
|
||||
"default".to_string()
|
||||
// Avoid allocation if tree already exists
|
||||
let tree = if let Some(tree) = trees.get_mut(model_id) {
|
||||
tree
|
||||
} else {
|
||||
model_id.to_string()
|
||||
// Create new tree and initialize with all workers
|
||||
let new_tree = Tree::new();
|
||||
// Initialize with all healthy workers like OLD version does
|
||||
for &idx in &healthy_indices {
|
||||
new_tree.insert("", workers[idx].url());
|
||||
}
|
||||
trees.entry(model_id.to_string()).or_insert(new_tree)
|
||||
};
|
||||
let tree = trees.entry(tree_key).or_insert_with(Tree::new);
|
||||
tree.insert(text, workers[min_load_idx].url());
|
||||
}
|
||||
}
|
||||
@@ -290,84 +295,54 @@ impl LoadBalancingPolicy for CacheAwarePolicy {
|
||||
let text = request_text.unwrap_or("");
|
||||
|
||||
if let Ok(mut trees) = self.trees.lock() {
|
||||
let mut best_match_idx: Option<usize> = None;
|
||||
let mut best_match_rate: f32 = 0.0;
|
||||
|
||||
// Find best match across all models
|
||||
for (model_id, worker_indices) in &model_workers {
|
||||
let tree = trees.entry(model_id.clone()).or_insert_with(Tree::new);
|
||||
|
||||
let (matched_text, matched_worker) = tree.prefix_match(text);
|
||||
let match_rate = if text.is_empty() {
|
||||
0.0
|
||||
} else {
|
||||
matched_text.chars().count() as f32 / text.chars().count() as f32
|
||||
};
|
||||
|
||||
// Check if this model has the best match
|
||||
if match_rate > best_match_rate {
|
||||
// Find the worker index for this URL
|
||||
if let Some(idx) = worker_indices
|
||||
.iter()
|
||||
.find(|&&idx| workers[idx].url() == matched_worker)
|
||||
{
|
||||
best_match_idx = Some(*idx);
|
||||
best_match_rate = match_rate;
|
||||
}
|
||||
// Avoid allocation if tree already exists
|
||||
let tree = if let Some(tree) = trees.get_mut(model_id) {
|
||||
tree
|
||||
} else {
|
||||
// Create new tree and initialize with all workers
|
||||
let new_tree = Tree::new();
|
||||
// Initialize with all healthy workers like OLD version does
|
||||
for &idx in &healthy_indices {
|
||||
new_tree.insert("", workers[idx].url());
|
||||
}
|
||||
}
|
||||
trees.entry(model_id.to_string()).or_insert(new_tree)
|
||||
};
|
||||
let (matched_text, matched_worker) = tree.prefix_match(text);
|
||||
let match_rate = if text.is_empty() {
|
||||
0.0
|
||||
} else {
|
||||
matched_text.chars().count() as f32 / text.chars().count() as f32
|
||||
};
|
||||
|
||||
// Select worker based on cache threshold
|
||||
let selected_idx = if let (Some(idx), true) = (
|
||||
best_match_idx,
|
||||
best_match_rate > self.config.cache_threshold,
|
||||
) {
|
||||
let selected_url = if match_rate > self.config.cache_threshold {
|
||||
RouterMetrics::record_cache_hit();
|
||||
idx
|
||||
matched_worker.to_string()
|
||||
} else {
|
||||
RouterMetrics::record_cache_miss();
|
||||
|
||||
// Find model with smallest tree (most cache capacity)
|
||||
let mut smallest_tree_model = String::new();
|
||||
let mut smallest_tree_size = usize::MAX;
|
||||
|
||||
for model_id in model_workers.keys() {
|
||||
let tree = trees.entry(model_id.clone()).or_insert_with(Tree::new);
|
||||
let size = tree.get_used_size_per_tenant().values().sum::<usize>();
|
||||
if size < smallest_tree_size {
|
||||
smallest_tree_size = size;
|
||||
smallest_tree_model = model_id.clone();
|
||||
}
|
||||
}
|
||||
|
||||
// Select least loaded worker from model with most cache capacity
|
||||
if let Some(worker_indices) = model_workers.get(&smallest_tree_model) {
|
||||
worker_indices
|
||||
.iter()
|
||||
.min_by_key(|&&idx| workers[idx].load())
|
||||
.copied()
|
||||
.unwrap_or(healthy_indices[0])
|
||||
} else {
|
||||
healthy_indices[0]
|
||||
}
|
||||
tree.get_smallest_tenant()
|
||||
};
|
||||
|
||||
// Update the tree with this request
|
||||
let model_id = workers[selected_idx].model_id();
|
||||
let tree_key = if model_id.is_empty() || model_id == "unknown" {
|
||||
"default".to_string()
|
||||
// Find the index of the selected worker
|
||||
if let Some(selected_idx) = workers.iter().position(|w| w.url() == selected_url) {
|
||||
// Only proceed if the worker is healthy - use direct check like OLD version
|
||||
if workers[selected_idx].is_healthy() {
|
||||
// Update the tree with this request
|
||||
tree.insert(text, &selected_url);
|
||||
|
||||
// Increment processed counter
|
||||
workers[selected_idx].increment_processed();
|
||||
RouterMetrics::record_processed_request(&selected_url);
|
||||
|
||||
return Some(selected_idx);
|
||||
}
|
||||
} else {
|
||||
model_id.to_string()
|
||||
};
|
||||
let tree = trees.entry(tree_key).or_insert_with(Tree::new);
|
||||
tree.insert(text, workers[selected_idx].url());
|
||||
// Selected worker no longer exists, remove it from tree
|
||||
tree.remove_tenant(&selected_url);
|
||||
debug!("Removed stale worker {} from cache tree", selected_url);
|
||||
}
|
||||
|
||||
// Increment processed counter
|
||||
workers[selected_idx].increment_processed();
|
||||
RouterMetrics::record_processed_request(workers[selected_idx].url());
|
||||
RouterMetrics::record_policy_decision(self.name(), workers[selected_idx].url());
|
||||
|
||||
return Some(selected_idx);
|
||||
// Fallback to first healthy worker
|
||||
return healthy_indices.first().copied();
|
||||
}
|
||||
|
||||
// Fallback to first healthy worker if tree operations fail
|
||||
|
||||
Reference in New Issue
Block a user