305 lines
10 KiB
Rust
305 lines
10 KiB
Rust
use pyo3::prelude::*;
|
|
pub mod config;
|
|
pub mod logging;
|
|
use std::collections::HashMap;
|
|
pub mod openai_api_types;
|
|
pub mod pd_router;
|
|
pub mod pd_types;
|
|
pub mod prometheus;
|
|
pub mod request_adapter;
|
|
pub mod router;
|
|
pub mod server;
|
|
pub mod service_discovery;
|
|
pub mod tree;
|
|
use crate::prometheus::PrometheusConfig;
|
|
|
|
#[pyclass(eq)]
|
|
#[derive(Clone, PartialEq, Debug)]
|
|
pub enum PolicyType {
|
|
Random,
|
|
RoundRobin,
|
|
CacheAware,
|
|
PowerOfTwo, // Moved from PD-specific, now shared
|
|
}
|
|
|
|
#[pyclass]
|
|
#[derive(Debug, Clone, PartialEq)]
|
|
struct Router {
|
|
host: String,
|
|
port: u16,
|
|
worker_urls: Vec<String>,
|
|
policy: PolicyType,
|
|
worker_startup_timeout_secs: u64,
|
|
worker_startup_check_interval: u64,
|
|
cache_threshold: f32,
|
|
balance_abs_threshold: usize,
|
|
balance_rel_threshold: f32,
|
|
eviction_interval_secs: u64,
|
|
max_tree_size: usize,
|
|
max_payload_size: usize,
|
|
log_dir: Option<String>,
|
|
log_level: Option<String>,
|
|
service_discovery: bool,
|
|
selector: HashMap<String, String>,
|
|
service_discovery_port: u16,
|
|
service_discovery_namespace: Option<String>,
|
|
// PD service discovery fields
|
|
prefill_selector: HashMap<String, String>,
|
|
decode_selector: HashMap<String, String>,
|
|
bootstrap_port_annotation: String,
|
|
prometheus_port: Option<u16>,
|
|
prometheus_host: Option<String>,
|
|
request_timeout_secs: u64,
|
|
// PD mode flag
|
|
pd_disaggregation: bool,
|
|
// PD-specific fields (only used when pd_disaggregation is true)
|
|
prefill_urls: Option<Vec<(String, Option<u16>)>>,
|
|
decode_urls: Option<Vec<String>>,
|
|
}
|
|
|
|
impl Router {
|
|
/// Convert PyO3 Router to RouterConfig
|
|
pub fn to_router_config(&self) -> config::ConfigResult<config::RouterConfig> {
|
|
use config::{
|
|
DiscoveryConfig, MetricsConfig, PolicyConfig as ConfigPolicyConfig, RoutingMode,
|
|
};
|
|
|
|
// Determine routing mode
|
|
let mode = if self.pd_disaggregation {
|
|
RoutingMode::PrefillDecode {
|
|
prefill_urls: self.prefill_urls.clone().unwrap_or_default(),
|
|
decode_urls: self.decode_urls.clone().unwrap_or_default(),
|
|
}
|
|
} else {
|
|
RoutingMode::Regular {
|
|
worker_urls: self.worker_urls.clone(),
|
|
}
|
|
};
|
|
|
|
// Convert policy
|
|
let policy = match self.policy {
|
|
PolicyType::Random => ConfigPolicyConfig::Random,
|
|
PolicyType::RoundRobin => ConfigPolicyConfig::RoundRobin,
|
|
PolicyType::CacheAware => ConfigPolicyConfig::CacheAware {
|
|
cache_threshold: self.cache_threshold,
|
|
balance_abs_threshold: self.balance_abs_threshold,
|
|
balance_rel_threshold: self.balance_rel_threshold,
|
|
eviction_interval_secs: self.eviction_interval_secs,
|
|
max_tree_size: self.max_tree_size,
|
|
},
|
|
PolicyType::PowerOfTwo => ConfigPolicyConfig::PowerOfTwo {
|
|
load_check_interval_secs: 5, // Default value
|
|
},
|
|
};
|
|
|
|
// Service discovery configuration
|
|
let discovery = if self.service_discovery {
|
|
Some(DiscoveryConfig {
|
|
enabled: true,
|
|
namespace: self.service_discovery_namespace.clone(),
|
|
port: self.service_discovery_port,
|
|
check_interval_secs: 60,
|
|
selector: self.selector.clone(),
|
|
prefill_selector: self.prefill_selector.clone(),
|
|
decode_selector: self.decode_selector.clone(),
|
|
bootstrap_port_annotation: self.bootstrap_port_annotation.clone(),
|
|
})
|
|
} else {
|
|
None
|
|
};
|
|
|
|
// Metrics configuration
|
|
let metrics = match (self.prometheus_port, self.prometheus_host.as_ref()) {
|
|
(Some(port), Some(host)) => Some(MetricsConfig {
|
|
port,
|
|
host: host.clone(),
|
|
}),
|
|
_ => None,
|
|
};
|
|
|
|
Ok(config::RouterConfig {
|
|
mode,
|
|
policy,
|
|
host: self.host.clone(),
|
|
port: self.port,
|
|
max_payload_size: self.max_payload_size,
|
|
request_timeout_secs: self.request_timeout_secs,
|
|
worker_startup_timeout_secs: self.worker_startup_timeout_secs,
|
|
worker_startup_check_interval_secs: self.worker_startup_check_interval,
|
|
discovery,
|
|
metrics,
|
|
log_dir: self.log_dir.clone(),
|
|
log_level: self.log_level.clone(),
|
|
})
|
|
}
|
|
}
|
|
|
|
#[pymethods]
|
|
impl Router {
|
|
#[new]
|
|
#[pyo3(signature = (
|
|
worker_urls,
|
|
policy = PolicyType::RoundRobin,
|
|
host = String::from("127.0.0.1"),
|
|
port = 3001,
|
|
worker_startup_timeout_secs = 300,
|
|
worker_startup_check_interval = 10,
|
|
cache_threshold = 0.50,
|
|
balance_abs_threshold = 32,
|
|
balance_rel_threshold = 1.0001,
|
|
eviction_interval_secs = 60,
|
|
max_tree_size = 2usize.pow(24),
|
|
max_payload_size = 256 * 1024 * 1024, // 256MB default for large batches
|
|
log_dir = None,
|
|
log_level = None,
|
|
service_discovery = false,
|
|
selector = HashMap::new(),
|
|
service_discovery_port = 80,
|
|
service_discovery_namespace = None,
|
|
prefill_selector = HashMap::new(),
|
|
decode_selector = HashMap::new(),
|
|
bootstrap_port_annotation = String::from("sglang.ai/bootstrap-port"),
|
|
prometheus_port = None,
|
|
prometheus_host = None,
|
|
request_timeout_secs = 600, // Add configurable request timeout
|
|
pd_disaggregation = false, // New flag for PD mode
|
|
prefill_urls = None,
|
|
decode_urls = None
|
|
))]
|
|
fn new(
|
|
worker_urls: Vec<String>,
|
|
policy: PolicyType,
|
|
host: String,
|
|
port: u16,
|
|
worker_startup_timeout_secs: u64,
|
|
worker_startup_check_interval: u64,
|
|
cache_threshold: f32,
|
|
balance_abs_threshold: usize,
|
|
balance_rel_threshold: f32,
|
|
eviction_interval_secs: u64,
|
|
max_tree_size: usize,
|
|
max_payload_size: usize,
|
|
log_dir: Option<String>,
|
|
log_level: Option<String>,
|
|
service_discovery: bool,
|
|
selector: HashMap<String, String>,
|
|
service_discovery_port: u16,
|
|
service_discovery_namespace: Option<String>,
|
|
prefill_selector: HashMap<String, String>,
|
|
decode_selector: HashMap<String, String>,
|
|
bootstrap_port_annotation: String,
|
|
prometheus_port: Option<u16>,
|
|
prometheus_host: Option<String>,
|
|
request_timeout_secs: u64,
|
|
pd_disaggregation: bool,
|
|
prefill_urls: Option<Vec<(String, Option<u16>)>>,
|
|
decode_urls: Option<Vec<String>>,
|
|
) -> PyResult<Self> {
|
|
Ok(Router {
|
|
host,
|
|
port,
|
|
worker_urls,
|
|
policy,
|
|
worker_startup_timeout_secs,
|
|
worker_startup_check_interval,
|
|
cache_threshold,
|
|
balance_abs_threshold,
|
|
balance_rel_threshold,
|
|
eviction_interval_secs,
|
|
max_tree_size,
|
|
max_payload_size,
|
|
log_dir,
|
|
log_level,
|
|
service_discovery,
|
|
selector,
|
|
service_discovery_port,
|
|
service_discovery_namespace,
|
|
prefill_selector,
|
|
decode_selector,
|
|
bootstrap_port_annotation,
|
|
prometheus_port,
|
|
prometheus_host,
|
|
request_timeout_secs,
|
|
pd_disaggregation,
|
|
prefill_urls,
|
|
decode_urls,
|
|
})
|
|
}
|
|
|
|
fn start(&self) -> PyResult<()> {
|
|
// Convert to RouterConfig and validate
|
|
let router_config = self.to_router_config().map_err(|e| {
|
|
pyo3::exceptions::PyValueError::new_err(format!("Configuration error: {}", e))
|
|
})?;
|
|
|
|
// Validate the configuration
|
|
router_config.validate().map_err(|e| {
|
|
pyo3::exceptions::PyValueError::new_err(format!(
|
|
"Configuration validation failed: {}",
|
|
e
|
|
))
|
|
})?;
|
|
|
|
// Convert to internal policy config
|
|
let policy_config = router_config
|
|
.to_routing_policy_config()
|
|
.map_err(|e| pyo3::exceptions::PyValueError::new_err(e.to_string()))?;
|
|
|
|
// Create service discovery config if enabled
|
|
let service_discovery_config = if self.service_discovery {
|
|
Some(service_discovery::ServiceDiscoveryConfig {
|
|
enabled: true,
|
|
selector: self.selector.clone(),
|
|
check_interval: std::time::Duration::from_secs(60),
|
|
port: self.service_discovery_port,
|
|
namespace: self.service_discovery_namespace.clone(),
|
|
// PD mode configuration
|
|
pd_mode: self.pd_disaggregation,
|
|
prefill_selector: self.prefill_selector.clone(),
|
|
decode_selector: self.decode_selector.clone(),
|
|
bootstrap_port_annotation: self.bootstrap_port_annotation.clone(),
|
|
})
|
|
} else {
|
|
None
|
|
};
|
|
|
|
// Create Prometheus config if enabled
|
|
let prometheus_config = Some(PrometheusConfig {
|
|
port: self.prometheus_port.unwrap_or(29000),
|
|
host: self
|
|
.prometheus_host
|
|
.clone()
|
|
.unwrap_or_else(|| "127.0.0.1".to_string()),
|
|
});
|
|
|
|
// Use tokio runtime instead of actix-web System for better compatibility
|
|
let runtime = tokio::runtime::Runtime::new()
|
|
.map_err(|e| pyo3::exceptions::PyRuntimeError::new_err(e.to_string()))?;
|
|
|
|
// Block on the async startup function
|
|
runtime.block_on(async move {
|
|
server::startup(server::ServerConfig {
|
|
host: self.host.clone(),
|
|
port: self.port,
|
|
worker_urls: self.worker_urls.clone(),
|
|
policy_config,
|
|
max_payload_size: self.max_payload_size,
|
|
log_dir: self.log_dir.clone(),
|
|
log_level: self.log_level.clone(),
|
|
service_discovery_config,
|
|
prometheus_config,
|
|
request_timeout_secs: self.request_timeout_secs,
|
|
})
|
|
.await
|
|
.map_err(|e| pyo3::exceptions::PyRuntimeError::new_err(e.to_string()))
|
|
})
|
|
}
|
|
}
|
|
|
|
#[pymodule]
|
|
fn sglang_router_rs(m: &Bound<'_, PyModule>) -> PyResult<()> {
|
|
m.add_class::<PolicyType>()?;
|
|
m.add_class::<Router>()?;
|
|
Ok(())
|
|
}
|