[router] change worker api to async instead of sync (#11566)

This commit is contained in:
Simo Lin
2025-10-14 03:32:21 -04:00
committed by GitHub
parent 0b9915c132
commit 4b62af92ef
8 changed files with 650 additions and 108 deletions

View File

@@ -0,0 +1,360 @@
//! Async job queue for control plane operations
//!
//! Provides non-blocking worker management by queuing operations and processing
//! them asynchronously in background worker tasks.
use crate::core::WorkerManager;
use crate::protocols::worker_spec::{JobStatus, WorkerConfigRequest};
use crate::server::AppContext;
use dashmap::DashMap;
use metrics::{counter, gauge, histogram};
use std::sync::{Arc, Weak};
use std::time::{Duration, SystemTime};
use tokio::sync::mpsc;
use tracing::{debug, error, info, warn};
/// Job types for control plane operations
#[derive(Debug, Clone)]
pub enum Job {
AddWorker { config: Box<WorkerConfigRequest> },
RemoveWorker { url: String },
}
impl Job {
/// Get job type as string for logging
pub fn job_type(&self) -> &str {
match self {
Job::AddWorker { .. } => "AddWorker",
Job::RemoveWorker { .. } => "RemoveWorker",
}
}
/// Get worker URL for logging
pub fn worker_url(&self) -> &str {
match self {
Job::AddWorker { config } => &config.url,
Job::RemoveWorker { url } => url,
}
}
}
impl JobStatus {
fn pending(job_type: &str, worker_url: &str) -> Self {
Self {
job_type: job_type.to_string(),
worker_url: worker_url.to_string(),
status: "pending".to_string(),
message: None,
timestamp: SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs(),
}
}
fn processing(job_type: &str, worker_url: &str) -> Self {
Self {
job_type: job_type.to_string(),
worker_url: worker_url.to_string(),
status: "processing".to_string(),
message: None,
timestamp: SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs(),
}
}
fn failed(job_type: &str, worker_url: &str, error: String) -> Self {
Self {
job_type: job_type.to_string(),
worker_url: worker_url.to_string(),
status: "failed".to_string(),
message: Some(error),
timestamp: SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs(),
}
}
}
/// Job queue configuration
#[derive(Clone, Debug)]
pub struct JobQueueConfig {
/// Maximum pending jobs in queue
pub queue_capacity: usize,
/// Number of worker tasks processing jobs
pub worker_count: usize,
}
impl Default for JobQueueConfig {
fn default() -> Self {
Self {
queue_capacity: 1000,
worker_count: 2,
}
}
}
/// Job queue manager for worker validation and removal operations
pub struct JobQueue {
/// Channel for submitting jobs
tx: mpsc::Sender<Job>,
/// Weak reference to AppContext to avoid circular dependencies
context: Weak<AppContext>,
/// Job status tracking by worker URL
status_map: Arc<DashMap<String, JobStatus>>,
}
impl std::fmt::Debug for JobQueue {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("JobQueue")
.field("status_count", &self.status_map.len())
.finish()
}
}
impl JobQueue {
/// Create a new job queue with background workers (spawns tasks)
///
/// Takes a Weak reference to AppContext to avoid circular strong references.
/// Spawns background worker tasks that will process jobs asynchronously.
pub fn new(config: JobQueueConfig, context: Weak<AppContext>) -> Arc<Self> {
let (tx, rx) = mpsc::channel(config.queue_capacity);
info!(
"Initializing worker job queue: capacity={}, workers={}",
config.queue_capacity, config.worker_count
);
let rx = Arc::new(tokio::sync::Mutex::new(rx));
let status_map = Arc::new(DashMap::new());
let queue = Arc::new(Self {
tx,
context: context.clone(),
status_map: status_map.clone(),
});
for i in 0..config.worker_count {
let rx = Arc::clone(&rx);
let context = context.clone();
let status_map = status_map.clone();
tokio::spawn(async move {
Self::worker_loop(i, rx, context, status_map).await;
});
}
// Spawn cleanup task for old job statuses (TTL 5 minutes)
let cleanup_status_map = status_map.clone();
tokio::spawn(async move {
Self::cleanup_old_statuses(cleanup_status_map).await;
});
queue
}
/// Submit a job
pub async fn submit(&self, job: Job) -> Result<(), String> {
// Check if context is still alive before accepting jobs
if self.context.upgrade().is_none() {
counter!("sgl_router_job_shutdown_rejected_total").increment(1);
return Err("Job queue shutting down: AppContext dropped".to_string());
}
// Extract values before moving job
let job_type = job.job_type().to_string();
let worker_url = job.worker_url().to_string();
// Record pending status
self.status_map.insert(
worker_url.clone(),
JobStatus::pending(&job_type, &worker_url),
);
match self.tx.send(job).await {
Ok(_) => {
let queue_depth = self.tx.max_capacity() - self.tx.capacity();
gauge!("sgl_router_job_queue_depth").set(queue_depth as f64);
info!(
"Job submitted: type={}, worker={}, queue_depth={}",
job_type, worker_url, queue_depth
);
Ok(())
}
Err(_) => {
counter!("sgl_router_job_queue_full_total").increment(1);
// Remove status on failure
self.status_map.remove(&worker_url);
Err("Worker job queue full".to_string())
}
}
}
/// Get job status by worker URL
pub fn get_status(&self, worker_url: &str) -> Option<JobStatus> {
self.status_map.get(worker_url).map(|entry| entry.clone())
}
/// Remove job status (called when worker is deleted)
pub fn remove_status(&self, worker_url: &str) {
self.status_map.remove(worker_url);
}
/// Worker loop that processes jobs
async fn worker_loop(
worker_id: usize,
rx: Arc<tokio::sync::Mutex<mpsc::Receiver<Job>>>,
context: Weak<AppContext>,
status_map: Arc<DashMap<String, JobStatus>>,
) {
info!("Worker job queue worker {} started", worker_id);
loop {
// Lock the receiver and try to receive a job
let job = {
let mut rx_guard = rx.lock().await;
rx_guard.recv().await
};
match job {
Some(job) => {
let job_type = job.job_type().to_string();
let worker_url = job.worker_url().to_string();
let start = std::time::Instant::now();
// Update status to processing
status_map.insert(
worker_url.clone(),
JobStatus::processing(&job_type, &worker_url),
);
info!(
"Worker {} processing job: type={}, worker={}",
worker_id, job_type, worker_url
);
// Upgrade weak reference to process job
match context.upgrade() {
Some(ctx) => {
// Execute job
let result = Self::execute_job(&job, &ctx).await;
let duration = start.elapsed();
// Record metrics
histogram!("sgl_router_job_duration_seconds", "job_type" => job_type.clone())
.record(duration.as_secs_f64());
match result {
Ok(message) => {
counter!("sgl_router_job_success_total", "job_type" => job_type.clone())
.increment(1);
// Remove status on success - worker in registry is sufficient
status_map.remove(&worker_url);
info!(
"Worker {} completed job: type={}, worker={}, duration={:.3}s, result={}",
worker_id, job_type, worker_url, duration.as_secs_f64(), message
);
}
Err(error) => {
counter!("sgl_router_job_failure_total", "job_type" => job_type.clone())
.increment(1);
// Keep failed status for API to report error details
status_map.insert(
worker_url.clone(),
JobStatus::failed(&job_type, &worker_url, error.clone()),
);
warn!(
"Worker {} failed job: type={}, worker={}, duration={:.3}s, error={}",
worker_id, job_type, worker_url, duration.as_secs_f64(), error
);
}
}
}
None => {
let error_msg = "AppContext dropped".to_string();
status_map.insert(
worker_url.clone(),
JobStatus::failed(&job_type, &worker_url, error_msg),
);
error!(
"Worker {}: AppContext dropped, cannot process job: type={}, worker={}",
worker_id, job_type, worker_url
);
break;
}
}
}
None => {
warn!(
"Worker job queue worker {} channel closed, stopping",
worker_id
);
break;
}
}
}
warn!("Worker job queue worker {} stopped", worker_id);
}
/// Execute a specific job
async fn execute_job(job: &Job, context: &Arc<AppContext>) -> Result<String, String> {
match job {
Job::AddWorker { config } => {
// Register worker with is_healthy=false
let worker =
WorkerManager::add_worker_from_config(config.as_ref(), context).await?;
// Validate and activate
WorkerManager::validate_and_activate_worker(&worker, context).await
}
Job::RemoveWorker { url } => {
let result = WorkerManager::remove_worker(url, context);
// Clean up job status when removing worker
if let Some(queue) = context.worker_job_queue.get() {
queue.remove_status(url);
}
result
}
}
}
/// Cleanup old job statuses (TTL 5 minutes)
async fn cleanup_old_statuses(status_map: Arc<DashMap<String, JobStatus>>) {
const CLEANUP_INTERVAL: Duration = Duration::from_secs(60); // Run every minute
const STATUS_TTL: u64 = 300; // 5 minutes in seconds
loop {
tokio::time::sleep(CLEANUP_INTERVAL).await;
let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs();
// Remove statuses older than TTL
status_map.retain(|_key, value| now - value.timestamp < STATUS_TTL);
debug!(
"Cleaned up old job statuses, remaining: {}",
status_map.len()
);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_job_queue_config_default() {
let config = JobQueueConfig::default();
assert_eq!(config.queue_capacity, 1000);
assert_eq!(config.worker_count, 2);
}
}

View File

@@ -8,6 +8,7 @@
pub mod circuit_breaker;
pub mod error;
pub mod job_queue;
pub mod retry;
pub mod token_bucket;
pub mod worker;
@@ -19,10 +20,11 @@ pub use circuit_breaker::{
CircuitBreaker, CircuitBreakerConfig, CircuitBreakerStats, CircuitState,
};
pub use error::{WorkerError, WorkerResult};
pub use job_queue::{Job, JobQueue, JobQueueConfig};
pub use retry::{is_retryable_status, BackoffCalculator, RetryError, RetryExecutor};
pub use worker::{
start_health_checker, BasicWorker, ConnectionMode, DPAwareWorker, HealthChecker, HealthConfig,
Worker, WorkerFactory, WorkerLoadGuard, WorkerType,
start_health_checker, worker_to_info, BasicWorker, ConnectionMode, DPAwareWorker,
HealthChecker, HealthConfig, Worker, WorkerFactory, WorkerLoadGuard, WorkerType,
};
pub use worker_builder::{BasicWorkerBuilder, DPAwareWorkerBuilder};
pub use worker_manager::{DpInfo, LoadMonitor, ServerInfo, WorkerManager};

View File

@@ -3,6 +3,7 @@ use crate::core::CircuitState;
use crate::core::{BasicWorkerBuilder, DPAwareWorkerBuilder};
use crate::grpc_client::SglangSchedulerClient;
use crate::metrics::RouterMetrics;
use crate::protocols::worker_spec::WorkerInfo;
use async_trait::async_trait;
use futures;
use serde_json;
@@ -974,6 +975,39 @@ pub fn start_health_checker(
HealthChecker { handle, shutdown }
}
/// Helper to convert Worker trait object to WorkerInfo struct
pub fn worker_to_info(worker: &Arc<dyn Worker>) -> WorkerInfo {
let worker_type_str = match worker.worker_type() {
WorkerType::Regular => "regular",
WorkerType::Prefill { .. } => "prefill",
WorkerType::Decode => "decode",
};
let bootstrap_port = match worker.worker_type() {
WorkerType::Prefill { bootstrap_port } => bootstrap_port,
_ => None,
};
WorkerInfo {
id: worker.url().to_string(),
url: worker.url().to_string(),
model_id: worker.model_id().to_string(),
priority: worker.priority(),
cost: worker.cost(),
worker_type: worker_type_str.to_string(),
is_healthy: worker.is_healthy(),
load: worker.load(),
connection_mode: format!("{:?}", worker.connection_mode()),
tokenizer_path: worker.tokenizer_path().map(String::from),
reasoning_parser: worker.reasoning_parser().map(String::from),
tool_parser: worker.tool_parser().map(String::from),
chat_template: worker.chat_template().map(String::from),
bootstrap_port,
metadata: worker.metadata().labels.clone(),
job_status: None,
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -551,31 +551,23 @@ impl WorkerManager {
}
/// Add a worker from a configuration request
///
/// Registers worker immediately with healthy=false, returns worker for async validation
pub async fn add_worker_from_config(
config: &WorkerConfigRequest,
context: &AppContext,
) -> Result<String, String> {
) -> Result<Arc<dyn Worker>, String> {
// Check if worker already exists
if context.worker_registry.get_by_url(&config.url).is_some() {
return Err(format!("Worker {} already exists", config.url));
}
let mut labels = config.labels.clone();
let model_id = if let Some(ref model_id) = config.model_id {
model_id.clone()
} else {
match Self::get_server_info(&config.url, config.api_key.as_deref()).await {
Ok(info) => info
.model_id
.or_else(|| {
info.model_path
.as_ref()
.and_then(|path| path.split('/').next_back().map(|s| s.to_string()))
})
.unwrap_or_else(|| "unknown".to_string()),
Err(e) => {
warn!("Failed to query server info from {}: {}", config.url, e);
"unknown".to_string()
}
}
};
// Use provided model_id or default to "unknown"
let model_id = config
.model_id
.clone()
.unwrap_or_else(|| "unknown".to_string());
labels.insert("model_id".to_string(), model_id.clone());
if let Some(priority) = config.priority {
labels.insert("priority".to_string(), priority.to_string());
@@ -614,18 +606,54 @@ impl WorkerManager {
ConnectionMode::Http
};
let policy_hint = labels.get("policy").cloned();
let circuit_breaker_config = Self::convert_circuit_breaker_config(
&context.router_config.effective_circuit_breaker_config(),
);
let health_config = Self::convert_health_config(&context.router_config.health_check);
Self::add_worker_internal(
&config.url,
// Create and register worker (starts with healthy=false)
let worker = Self::create_basic_worker(
config.url.clone(),
worker_type,
connection_mode,
config.api_key.clone(),
Some(labels),
policy_hint.as_deref(),
context,
)
.await
Some(labels.clone()),
circuit_breaker_config,
health_config,
);
worker.set_healthy(false);
context.worker_registry.register(worker.clone());
let policy_hint = labels.get("policy").map(|s| s.as_str());
context
.policy_registry
.on_worker_added(&model_id, policy_hint);
info!("Registered worker {} (initializing)", config.url);
// Return worker for async validation
Ok(worker)
}
/// Validate and activate a worker (for async validation after registration)
pub async fn validate_and_activate_worker(
worker: &Arc<dyn Worker>,
context: &AppContext,
) -> Result<String, String> {
let url = worker.url();
// Perform health validation
WorkerFactory::validate_health(url, context.router_config.worker_startup_timeout_secs)
.await
.map_err(|e| format!("Health check failed for {}: {}", url, e))?;
// Mark as healthy
worker.set_healthy(true);
info!("Worker {} validated and activated", url);
Ok(format!("Worker {} is now healthy", url))
}
/// Add a worker from URL (legacy endpoint)