diff --git a/sgl-router/src/core/worker.rs b/sgl-router/src/core/worker.rs index b091afa26..62ee37a5d 100644 --- a/sgl-router/src/core/worker.rs +++ b/sgl-router/src/core/worker.rs @@ -804,7 +804,7 @@ impl WorkerFactory { } } - tokio::time::sleep(Duration::from_secs(1)).await; + time::sleep(Duration::from_secs(1)).await; } } } @@ -900,7 +900,7 @@ pub fn start_health_checker( let shutdown_clone = shutdown.clone(); let handle = tokio::spawn(async move { - let mut interval = tokio::time::interval(Duration::from_secs(check_interval_secs)); + let mut interval = time::interval(Duration::from_secs(check_interval_secs)); // Counter for periodic load reset (every 10 health check cycles) let mut check_count = 0u64; @@ -1272,7 +1272,7 @@ mod tests { let worker_clone = Arc::clone(&worker); let handle = tokio::spawn(async move { worker_clone.set_healthy(i % 2 == 0); - tokio::time::sleep(Duration::from_micros(10)).await; + time::sleep(Duration::from_micros(10)).await; }); handles.push(handle); } diff --git a/sgl-router/src/core/worker_manager.rs b/sgl-router/src/core/worker_manager.rs index 0d4185440..9a91014f4 100644 --- a/sgl-router/src/core/worker_manager.rs +++ b/sgl-router/src/core/worker_manager.rs @@ -1180,7 +1180,7 @@ impl WorkerManager { }); } - let results = futures::future::join_all(tasks).await; + let results = future::join_all(tasks).await; let mut successful = Vec::new(); let mut failed = Vec::new(); @@ -1321,7 +1321,7 @@ impl WorkerManager { }); } - let loads = futures::future::join_all(tasks).await; + let loads = future::join_all(tasks).await; let successful = loads.iter().filter(|l| l.load >= 0).count(); let failed = loads.iter().filter(|l| l.load < 0).count(); diff --git a/sgl-router/src/core/worker_registry.rs b/sgl-router/src/core/worker_registry.rs index 9580f3659..e4d65a491 100644 --- a/sgl-router/src/core/worker_registry.rs +++ b/sgl-router/src/core/worker_registry.rs @@ -388,7 +388,7 @@ impl WorkerRegistry { } // Get all workers from registry - let workers: Vec> = workers_ref + let workers: Vec> = workers_ref .iter() .map(|entry| entry.value().clone()) .collect(); diff --git a/sgl-router/src/data_connector/responses.rs b/sgl-router/src/data_connector/responses.rs index 7a4277183..e0420f3c6 100644 --- a/sgl-router/src/data_connector/responses.rs +++ b/sgl-router/src/data_connector/responses.rs @@ -51,10 +51,10 @@ pub struct StoredResponse { pub output: String, /// Tool calls made by the model (if any) - pub tool_calls: Vec, + pub tool_calls: Vec, /// Custom metadata - pub metadata: HashMap, + pub metadata: HashMap, /// When this response was created pub created_at: chrono::DateTime, @@ -95,7 +95,7 @@ pub struct ResponseChain { pub responses: Vec, /// Metadata about the chain - pub metadata: HashMap, + pub metadata: HashMap, } impl Default for ResponseChain { diff --git a/sgl-router/src/middleware.rs b/sgl-router/src/middleware.rs index 9fcd273cb..0c1f3098c 100644 --- a/sgl-router/src/middleware.rs +++ b/sgl-router/src/middleware.rs @@ -236,7 +236,7 @@ impl Default for ResponseLogger { } impl OnResponse for ResponseLogger { - fn on_response(self, response: &Response, latency: std::time::Duration, span: &Span) { + fn on_response(self, response: &Response, latency: Duration, span: &Span) { let status = response.status(); // Record these in the span for structured logging/observability tools @@ -345,10 +345,10 @@ pub struct QueuedRequest { /// Queue metrics for monitoring #[derive(Debug, Default)] pub struct QueueMetrics { - pub total_queued: std::sync::atomic::AtomicU64, - pub current_queued: std::sync::atomic::AtomicU64, - pub total_timeout: std::sync::atomic::AtomicU64, - pub total_rejected: std::sync::atomic::AtomicU64, + pub total_queued: AtomicU64, + pub current_queued: AtomicU64, + pub total_timeout: AtomicU64, + pub total_rejected: AtomicU64, } /// Queue processor that handles queued requests @@ -447,7 +447,7 @@ impl ConcurrencyLimiter { /// Middleware function for concurrency limiting with optional queuing pub async fn concurrency_limit_middleware( State(app_state): State>, - request: Request, + request: Request, next: Next, ) -> Response { // Static counter for embeddings queue size diff --git a/sgl-router/src/routers/grpc/streaming.rs b/sgl-router/src/routers/grpc/streaming.rs index 86d81f6c3..4920b30b1 100644 --- a/sgl-router/src/routers/grpc/streaming.rs +++ b/sgl-router/src/routers/grpc/streaming.rs @@ -827,8 +827,7 @@ impl StreamingProcessor { // Store latest output logprobs (cumulative from proto, convert to SGLang format) if let Some(ref output_logprobs) = chunk.output_logprobs { - let converted = - super::utils::convert_generate_output_logprobs(output_logprobs); + let converted = utils::convert_generate_output_logprobs(output_logprobs); accumulated_output_logprobs.insert(index, Some(converted)); } diff --git a/sgl-router/src/routers/grpc/utils.rs b/sgl-router/src/routers/grpc/utils.rs index 8b7490a45..01474fb10 100644 --- a/sgl-router/src/routers/grpc/utils.rs +++ b/sgl-router/src/routers/grpc/utils.rs @@ -522,7 +522,7 @@ pub fn parse_json_schema_response( match serde_json::from_str::(processed_text) { Ok(params) => { let tool_call = ToolCall { - id: format!("call_{}", uuid::Uuid::new_v4()), + id: format!("call_{}", Uuid::new_v4()), tool_type: "function".to_string(), function: FunctionCallResponse { name: function.name.clone(), @@ -553,7 +553,7 @@ pub fn parse_json_schema_response( let parameters = obj.get("parameters")?; Some(ToolCall { - id: format!("call_{}_{}", i, uuid::Uuid::new_v4()), + id: format!("call_{}_{}", i, Uuid::new_v4()), tool_type: "function".to_string(), function: FunctionCallResponse { name, diff --git a/sgl-router/src/routers/http/openai_router.rs b/sgl-router/src/routers/http/openai_router.rs index d14028592..33034457f 100644 --- a/sgl-router/src/routers/http/openai_router.rs +++ b/sgl-router/src/routers/http/openai_router.rs @@ -2016,7 +2016,7 @@ impl OpenAIRouter { /// /// Returns borrowed strings when possible to avoid allocations in hot paths. /// Only allocates when multiple data lines need to be joined. - fn parse_sse_block(block: &str) -> (Option<&str>, std::borrow::Cow<'_, str>) { + fn parse_sse_block(block: &str) -> (Option<&str>, Cow<'_, str>) { let mut event_name: Option<&str> = None; let mut data_lines: Vec<&str> = Vec::new(); @@ -2029,9 +2029,9 @@ impl OpenAIRouter { } let data = if data_lines.len() == 1 { - std::borrow::Cow::Borrowed(data_lines[0]) + Cow::Borrowed(data_lines[0]) } else { - std::borrow::Cow::Owned(data_lines.join("\n")) + Cow::Owned(data_lines.join("\n")) }; (event_name, data) @@ -2714,7 +2714,7 @@ impl OpenAIRouter { } ResponseInput::Items(items) => { // Items are already structured ResponseInputOutputItem, convert to JSON - if let Ok(items_value) = serde_json::to_value(items) { + if let Ok(items_value) = to_value(items) { if let Some(items_arr) = items_value.as_array() { input_array.extend_from_slice(items_arr); } @@ -2773,7 +2773,7 @@ impl OpenAIRouter { .unwrap_or("{}"); // Check if output contains error by parsing JSON - let is_error = serde_json::from_str::(output_str) + let is_error = serde_json::from_str::(output_str) .map(|v| v.get("error").is_some()) .unwrap_or(false); @@ -3189,7 +3189,7 @@ impl super::super::RouterTrait for OpenAIRouter { let content_type = res.headers().get(CONTENT_TYPE).cloned(); match res.bytes().await { Ok(body) => { - let mut response = Response::new(axum::body::Body::from(body)); + let mut response = Response::new(Body::from(body)); *response.status_mut() = status; if let Some(ct) = content_type { response.headers_mut().insert(CONTENT_TYPE, ct); @@ -3316,7 +3316,7 @@ impl super::super::RouterTrait for OpenAIRouter { match resp.bytes().await { Ok(body) => { self.circuit_breaker.record_success(); - let mut response = Response::new(axum::body::Body::from(body)); + let mut response = Response::new(Body::from(body)); *response.status_mut() = status; if let Some(ct) = content_type { response.headers_mut().insert(CONTENT_TYPE, ct); diff --git a/sgl-router/src/routers/http/pd_router.rs b/sgl-router/src/routers/http/pd_router.rs index e18e856f3..77feb22ae 100644 --- a/sgl-router/src/routers/http/pd_router.rs +++ b/sgl-router/src/routers/http/pd_router.rs @@ -88,7 +88,7 @@ impl PDRouter { match res.bytes().await { Ok(body) => { - let mut response = Response::new(axum::body::Body::from(body)); + let mut response = Response::new(Body::from(body)); *response.status_mut() = StatusCode::OK; *response.headers_mut() = response_headers; response @@ -201,7 +201,7 @@ impl PDRouter { } obj.insert( "bootstrap_host".to_string(), - Value::Array(hosts.into_iter().map(serde_json::Value::from).collect()), + Value::Array(hosts.into_iter().map(Value::from).collect()), ); obj.insert( "bootstrap_port".to_string(), @@ -209,7 +209,7 @@ impl PDRouter { ports .into_iter() .map(|p| match p { - Some(v) => serde_json::Value::from(v), + Some(v) => Value::from(v), None => Value::Null, }) .collect(), @@ -217,23 +217,23 @@ impl PDRouter { ); obj.insert( "bootstrap_room".to_string(), - Value::Array(rooms.into_iter().map(serde_json::Value::from).collect()), + Value::Array(rooms.into_iter().map(Value::from).collect()), ); } else { obj.insert( "bootstrap_host".to_string(), - serde_json::Value::from(prefill_worker.bootstrap_host()), + Value::from(prefill_worker.bootstrap_host()), ); obj.insert( "bootstrap_port".to_string(), match prefill_worker.bootstrap_port() { - Some(v) => serde_json::Value::from(v), + Some(v) => Value::from(v), None => Value::Null, }, ); obj.insert( "bootstrap_room".to_string(), - serde_json::Value::from(super::pd_types::generate_room_id()), + Value::from(super::pd_types::generate_room_id()), ); } Ok(original) @@ -508,8 +508,7 @@ impl PDRouter { match res.bytes().await { Ok(decode_body) => { - let mut response = - Response::new(axum::body::Body::from(decode_body)); + let mut response = Response::new(Body::from(decode_body)); *response.status_mut() = status; *response.headers_mut() = response_headers; response @@ -1365,7 +1364,7 @@ mod tests { assert_eq!(decode_ref.load(), 0); let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); - let stream = tokio_stream::wrappers::UnboundedReceiverStream::new(rx); + let stream = UnboundedReceiverStream::new(rx); let _response = router.create_streaming_response( stream.map(Ok), diff --git a/sgl-router/src/routers/http/router.rs b/sgl-router/src/routers/http/router.rs index af642cb2e..1ac198d43 100644 --- a/sgl-router/src/routers/http/router.rs +++ b/sgl-router/src/routers/http/router.rs @@ -95,7 +95,7 @@ impl Router { match res.bytes().await { Ok(body) => { - let mut response = Response::new(axum::body::Body::from(body)); + let mut response = Response::new(Body::from(body)); *response.status_mut() = status; *response.headers_mut() = response_headers; response @@ -315,7 +315,7 @@ impl Router { let response_headers = header_utils::preserve_response_headers(res.headers()); match res.bytes().await { Ok(body) => { - let mut response = Response::new(axum::body::Body::from(body)); + let mut response = Response::new(Body::from(body)); *response.status_mut() = status; *response.headers_mut() = response_headers; if status.is_success() { @@ -496,7 +496,7 @@ impl Router { let response = match res.bytes().await { Ok(body) => { - let mut response = Response::new(axum::body::Body::from(body)); + let mut response = Response::new(Body::from(body)); *response.status_mut() = status; *response.headers_mut() = response_headers; response diff --git a/sgl-router/src/tokenizer/mod.rs b/sgl-router/src/tokenizer/mod.rs index 5ff4cdbf1..b7edfaea1 100644 --- a/sgl-router/src/tokenizer/mod.rs +++ b/sgl-router/src/tokenizer/mod.rs @@ -42,7 +42,7 @@ pub struct Tokenizer(Arc); impl Tokenizer { /// Create a tokenizer from a file path pub fn from_file(file_path: &str) -> Result { - Ok(Tokenizer(factory::create_tokenizer_from_file(file_path)?)) + Ok(Tokenizer(create_tokenizer_from_file(file_path)?)) } /// Create a tokenizer from a file path with an optional chat template @@ -50,7 +50,7 @@ impl Tokenizer { file_path: &str, chat_template_path: Option<&str>, ) -> Result { - Ok(Tokenizer(factory::create_tokenizer_with_chat_template( + Ok(Tokenizer(create_tokenizer_with_chat_template( file_path, chat_template_path, )?)) diff --git a/sgl-router/src/tool_parser/parsers/kimik2_parser.rs b/sgl-router/src/tool_parser/parsers/kimik2_parser.rs index 9642713e9..b2d6e85d8 100644 --- a/sgl-router/src/tool_parser/parsers/kimik2_parser.rs +++ b/sgl-router/src/tool_parser/parsers/kimik2_parser.rs @@ -124,7 +124,7 @@ impl ToolParser for KimiK2Parser { // Parse function ID if let Some((func_name, _index)) = self.parse_function_id(function_id) { // Try to parse JSON arguments - match serde_json::from_str::(function_args) { + match serde_json::from_str::(function_args) { Ok(_) => { tools.push(ToolCall { function: FunctionCall {