[router] clean up lint warnings with clippy execution (#9201)
This commit is contained in:
@@ -352,7 +352,7 @@ impl RouterConfig {
|
|||||||
|
|
||||||
/// Check if service discovery is enabled
|
/// Check if service discovery is enabled
|
||||||
pub fn has_service_discovery(&self) -> bool {
|
pub fn has_service_discovery(&self) -> bool {
|
||||||
self.discovery.as_ref().map_or(false, |d| d.enabled)
|
self.discovery.as_ref().is_some_and(|d| d.enabled)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Check if metrics are enabled
|
/// Check if metrics are enabled
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ impl ConfigValidator {
|
|||||||
/// Validate a complete router configuration
|
/// Validate a complete router configuration
|
||||||
pub fn validate(config: &RouterConfig) -> ConfigResult<()> {
|
pub fn validate(config: &RouterConfig) -> ConfigResult<()> {
|
||||||
// Check if service discovery is enabled
|
// Check if service discovery is enabled
|
||||||
let has_service_discovery = config.discovery.as_ref().map_or(false, |d| d.enabled);
|
let has_service_discovery = config.discovery.as_ref().is_some_and(|d| d.enabled);
|
||||||
|
|
||||||
Self::validate_mode(&config.mode, has_service_discovery)?;
|
Self::validate_mode(&config.mode, has_service_discovery)?;
|
||||||
Self::validate_policy(&config.policy)?;
|
Self::validate_policy(&config.policy)?;
|
||||||
@@ -348,7 +348,7 @@ impl ConfigValidator {
|
|||||||
// No mode/policy restrictions needed anymore
|
// No mode/policy restrictions needed anymore
|
||||||
|
|
||||||
// Check if service discovery is enabled for worker count validation
|
// Check if service discovery is enabled for worker count validation
|
||||||
let has_service_discovery = config.discovery.as_ref().map_or(false, |d| d.enabled);
|
let has_service_discovery = config.discovery.as_ref().is_some_and(|d| d.enabled);
|
||||||
|
|
||||||
// Only validate worker counts if service discovery is disabled
|
// Only validate worker counts if service discovery is disabled
|
||||||
if !has_service_discovery {
|
if !has_service_discovery {
|
||||||
|
|||||||
@@ -33,7 +33,7 @@ impl BackoffCalculator {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Apply jitter in range [-j, +j]
|
// Apply jitter in range [-j, +j]
|
||||||
let jitter = config.jitter_factor.max(0.0).min(1.0);
|
let jitter = config.jitter_factor.clamp(0.0, 1.0);
|
||||||
if jitter > 0.0 {
|
if jitter > 0.0 {
|
||||||
let mut rng = rand::rng();
|
let mut rng = rand::rng();
|
||||||
let jitter_scale: f32 = rng.random_range(-jitter..=jitter);
|
let jitter_scale: f32 = rng.random_range(-jitter..=jitter);
|
||||||
|
|||||||
@@ -606,7 +606,7 @@ impl WorkerFactory {
|
|||||||
|
|
||||||
/// Get DP size from a worker
|
/// Get DP size from a worker
|
||||||
async fn get_worker_dp_size(url: &str, api_key: &Option<String>) -> WorkerResult<usize> {
|
async fn get_worker_dp_size(url: &str, api_key: &Option<String>) -> WorkerResult<usize> {
|
||||||
let mut req_builder = WORKER_CLIENT.get(&format!("{}/get_server_info", url));
|
let mut req_builder = WORKER_CLIENT.get(format!("{}/get_server_info", url));
|
||||||
|
|
||||||
if let Some(key) = api_key {
|
if let Some(key) = api_key {
|
||||||
req_builder = req_builder.bearer_auth(key);
|
req_builder = req_builder.bearer_auth(key);
|
||||||
|
|||||||
@@ -467,10 +467,11 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||||||
// Skip --prefill and its URL
|
// Skip --prefill and its URL
|
||||||
i += 2;
|
i += 2;
|
||||||
// Also skip bootstrap port if present
|
// Also skip bootstrap port if present
|
||||||
if i < raw_args.len() && !raw_args[i].starts_with("--") {
|
if i < raw_args.len()
|
||||||
if raw_args[i].parse::<u16>().is_ok() || raw_args[i].to_lowercase() == "none" {
|
&& !raw_args[i].starts_with("--")
|
||||||
i += 1;
|
&& (raw_args[i].parse::<u16>().is_ok() || raw_args[i].to_lowercase() == "none")
|
||||||
}
|
{
|
||||||
|
i += 1;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
filtered_args.push(raw_args[i].clone());
|
filtered_args.push(raw_args[i].clone());
|
||||||
|
|||||||
@@ -211,7 +211,7 @@ impl<B> OnRequest<B> for RequestLogger {
|
|||||||
// Try to get the request ID from extensions
|
// Try to get the request ID from extensions
|
||||||
// This will work if RequestIdLayer has already run
|
// This will work if RequestIdLayer has already run
|
||||||
if let Some(request_id) = request.extensions().get::<RequestId>() {
|
if let Some(request_id) = request.extensions().get::<RequestId>() {
|
||||||
span.record("request_id", &request_id.0.as_str());
|
span.record("request_id", request_id.0.as_str());
|
||||||
}
|
}
|
||||||
|
|
||||||
// Don't log here - we already log in RequestIdService with the proper request_id
|
// Don't log here - we already log in RequestIdService with the proper request_id
|
||||||
|
|||||||
@@ -232,7 +232,7 @@ impl PDRouter {
|
|||||||
})?;
|
})?;
|
||||||
|
|
||||||
// Check if already exists
|
// Check if already exists
|
||||||
if workers.iter().any(|w| w.url() == &url) {
|
if workers.iter().any(|w| w.url() == url) {
|
||||||
return Err(PDRouterError::WorkerAlreadyExists { url: url.clone() });
|
return Err(PDRouterError::WorkerAlreadyExists { url: url.clone() });
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -271,7 +271,7 @@ impl PDRouter {
|
|||||||
})?;
|
})?;
|
||||||
|
|
||||||
// Check if already exists
|
// Check if already exists
|
||||||
if workers.iter().any(|w| w.url() == &url) {
|
if workers.iter().any(|w| w.url() == url) {
|
||||||
return Err(PDRouterError::WorkerAlreadyExists { url: url.clone() });
|
return Err(PDRouterError::WorkerAlreadyExists { url: url.clone() });
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -532,11 +532,9 @@ impl PDRouter {
|
|||||||
// Helper to determine batch size from a GenerateRequest
|
// Helper to determine batch size from a GenerateRequest
|
||||||
fn get_generate_batch_size(req: &GenerateRequest) -> Option<usize> {
|
fn get_generate_batch_size(req: &GenerateRequest) -> Option<usize> {
|
||||||
// Check prompt array
|
// Check prompt array
|
||||||
if let Some(prompt) = &req.prompt {
|
if let Some(crate::openai_api_types::StringOrArray::Array(arr)) = &req.prompt {
|
||||||
if let crate::openai_api_types::StringOrArray::Array(arr) = prompt {
|
if !arr.is_empty() {
|
||||||
if !arr.is_empty() {
|
return Some(arr.len());
|
||||||
return Some(arr.len());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Check text array
|
// Check text array
|
||||||
@@ -978,14 +976,14 @@ impl PDRouter {
|
|||||||
|
|
||||||
// Select workers using helper function
|
// Select workers using helper function
|
||||||
let prefill = Self::pick_worker_by_policy(
|
let prefill = Self::pick_worker_by_policy(
|
||||||
&*prefill_workers,
|
&prefill_workers,
|
||||||
&*self.prefill_policy,
|
&*self.prefill_policy,
|
||||||
request_text,
|
request_text,
|
||||||
"prefill",
|
"prefill",
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
let decode = Self::pick_worker_by_policy(
|
let decode = Self::pick_worker_by_policy(
|
||||||
&*decode_workers,
|
&decode_workers,
|
||||||
&*self.decode_policy,
|
&*self.decode_policy,
|
||||||
request_text,
|
request_text,
|
||||||
"decode",
|
"decode",
|
||||||
@@ -1488,7 +1486,7 @@ impl RouterTrait for PDRouter {
|
|||||||
let (prefill_result, decode_result) = tokio::join!(
|
let (prefill_result, decode_result) = tokio::join!(
|
||||||
self.client.get(&prefill_url).send(),
|
self.client.get(&prefill_url).send(),
|
||||||
self.client
|
self.client
|
||||||
.get(&format!("{}/health_generate", decode.url()))
|
.get(format!("{}/health_generate", decode.url()))
|
||||||
.send()
|
.send()
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|||||||
@@ -276,7 +276,7 @@ impl Router {
|
|||||||
|
|
||||||
fn get_worker_dp_size(worker_url: &str, api_key: &Option<String>) -> Result<usize, String> {
|
fn get_worker_dp_size(worker_url: &str, api_key: &Option<String>) -> Result<usize, String> {
|
||||||
let sync_client = reqwest::blocking::Client::new();
|
let sync_client = reqwest::blocking::Client::new();
|
||||||
let mut req_builder = sync_client.get(&format!("{}/get_server_info", worker_url));
|
let mut req_builder = sync_client.get(format!("{}/get_server_info", worker_url));
|
||||||
if let Some(key) = api_key {
|
if let Some(key) = api_key {
|
||||||
req_builder = req_builder.bearer_auth(key);
|
req_builder = req_builder.bearer_auth(key);
|
||||||
}
|
}
|
||||||
@@ -628,7 +628,7 @@ impl Router {
|
|||||||
if let Ok(workers_guard) = self.workers.read() {
|
if let Ok(workers_guard) = self.workers.read() {
|
||||||
if let Some(worker) = workers_guard.iter().find(|w| w.url() == worker_url) {
|
if let Some(worker) = workers_guard.iter().find(|w| w.url() == worker_url) {
|
||||||
worker.decrement_load();
|
worker.decrement_load();
|
||||||
RouterMetrics::set_running_requests(&worker_url, worker.load());
|
RouterMetrics::set_running_requests(worker_url, worker.load());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -659,7 +659,7 @@ impl Router {
|
|||||||
if let Ok(workers_guard) = self.workers.read() {
|
if let Ok(workers_guard) = self.workers.read() {
|
||||||
if let Some(worker) = workers_guard.iter().find(|w| w.url() == worker_url) {
|
if let Some(worker) = workers_guard.iter().find(|w| w.url() == worker_url) {
|
||||||
worker.decrement_load();
|
worker.decrement_load();
|
||||||
RouterMetrics::set_running_requests(&worker_url, worker.load());
|
RouterMetrics::set_running_requests(worker_url, worker.load());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -688,7 +688,7 @@ impl Router {
|
|||||||
{
|
{
|
||||||
if let Ok(workers_guard) = workers.read() {
|
if let Ok(workers_guard) = workers.read() {
|
||||||
if let Some(worker) =
|
if let Some(worker) =
|
||||||
workers_guard.iter().find(|w| w.url() == &worker_url)
|
workers_guard.iter().find(|w| w.url() == worker_url)
|
||||||
{
|
{
|
||||||
worker.decrement_load();
|
worker.decrement_load();
|
||||||
RouterMetrics::set_running_requests(
|
RouterMetrics::set_running_requests(
|
||||||
@@ -711,8 +711,7 @@ impl Router {
|
|||||||
}
|
}
|
||||||
if !decremented {
|
if !decremented {
|
||||||
if let Ok(workers_guard) = workers.read() {
|
if let Ok(workers_guard) = workers.read() {
|
||||||
if let Some(worker) = workers_guard.iter().find(|w| w.url() == &worker_url)
|
if let Some(worker) = workers_guard.iter().find(|w| w.url() == worker_url) {
|
||||||
{
|
|
||||||
worker.decrement_load();
|
worker.decrement_load();
|
||||||
RouterMetrics::set_running_requests(&worker_url, worker.load());
|
RouterMetrics::set_running_requests(&worker_url, worker.load());
|
||||||
}
|
}
|
||||||
@@ -783,7 +782,7 @@ impl Router {
|
|||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
match client.get(&format!("{}/health", worker_url)).send().await {
|
match client.get(format!("{}/health", worker_url)).send().await {
|
||||||
Ok(res) => {
|
Ok(res) => {
|
||||||
if res.status().is_success() {
|
if res.status().is_success() {
|
||||||
let mut workers_guard = self.workers.write().unwrap();
|
let mut workers_guard = self.workers.write().unwrap();
|
||||||
@@ -953,7 +952,7 @@ impl Router {
|
|||||||
|
|
||||||
match self
|
match self
|
||||||
.client
|
.client
|
||||||
.get(&format!("{}/get_load", worker_url))
|
.get(format!("{}/get_load", worker_url))
|
||||||
.send()
|
.send()
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
@@ -1036,7 +1035,7 @@ impl Router {
|
|||||||
worker_url
|
worker_url
|
||||||
};
|
};
|
||||||
|
|
||||||
match client.get(&format!("{}/get_load", worker_url)).send().await {
|
match client.get(format!("{}/get_load", worker_url)).send().await {
|
||||||
Ok(res) if res.status().is_success() => match res.bytes().await {
|
Ok(res) if res.status().is_success() => match res.bytes().await {
|
||||||
Ok(bytes) => match serde_json::from_slice::<serde_json::Value>(&bytes) {
|
Ok(bytes) => match serde_json::from_slice::<serde_json::Value>(&bytes) {
|
||||||
Ok(data) => data
|
Ok(data) => data
|
||||||
|
|||||||
@@ -74,11 +74,10 @@ impl PodInfo {
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
pod.metadata.labels.as_ref().map_or(false, |labels| {
|
pod.metadata
|
||||||
selector
|
.labels
|
||||||
.iter()
|
.as_ref()
|
||||||
.all(|(k, v)| labels.get(k).map_or(false, |label_value| label_value == v))
|
.is_some_and(|labels| selector.iter().all(|(k, v)| labels.get(k) == Some(v)))
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Check if a pod should be included in service discovery
|
/// Check if a pod should be included in service discovery
|
||||||
|
|||||||
@@ -74,13 +74,19 @@ fn shared_prefix_count(a: &str, b: &str) -> usize {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return i;
|
i
|
||||||
}
|
}
|
||||||
|
|
||||||
fn slice_by_chars(s: &str, start: usize, end: usize) -> String {
|
fn slice_by_chars(s: &str, start: usize, end: usize) -> String {
|
||||||
s.chars().skip(start).take(end - start).collect()
|
s.chars().skip(start).take(end - start).collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Default for Tree {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self::new()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl Tree {
|
impl Tree {
|
||||||
/*
|
/*
|
||||||
Thread-safe multi tenant radix tree
|
Thread-safe multi tenant radix tree
|
||||||
@@ -517,7 +523,7 @@ impl Tree {
|
|||||||
// add parent to queue if it becomes a leaf
|
// add parent to queue if it becomes a leaf
|
||||||
if let Some(parent) = curr.parent.read().unwrap().as_ref() {
|
if let Some(parent) = curr.parent.read().unwrap().as_ref() {
|
||||||
if Tree::leaf_of(parent).contains(&tenant.to_string()) {
|
if Tree::leaf_of(parent).contains(&tenant.to_string()) {
|
||||||
queue.push_back(Arc::clone(&parent));
|
queue.push_back(Arc::clone(parent));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -653,8 +659,6 @@ impl Tree {
|
|||||||
}
|
}
|
||||||
|
|
||||||
println!("{result}");
|
println!("{result}");
|
||||||
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user