From 0fb88aaa77827d23b14fe64099121976adafce10 Mon Sep 17 00:00:00 2001 From: Byron Hsu Date: Wed, 11 Dec 2024 01:38:50 -0800 Subject: [PATCH] [router] Use borrow if possible to save cost (#2441) --- rust/src/router.rs | 37 +++++++++++++++++++++++-------------- rust/src/server.rs | 10 +++++----- 2 files changed, 28 insertions(+), 19 deletions(-) diff --git a/rust/src/router.rs b/rust/src/router.rs index cbaa4673b..d07949d49 100644 --- a/rust/src/router.rs +++ b/rust/src/router.rs @@ -252,7 +252,7 @@ impl Router { async fn send_request( &self, client: &reqwest::Client, - worker_url: String, + worker_url: &str, route: &str, ) -> HttpResponse { match client.get(format!("{}{}", worker_url, route)).send().await { @@ -275,7 +275,7 @@ impl Router { pub async fn route_to_first(&self, client: &reqwest::Client, route: &str) -> HttpResponse { match self.select_first_worker() { - Ok(worker_url) => self.send_request(client, worker_url, route).await, + Ok(worker_url) => self.send_request(client, &worker_url, route).await, Err(e) => HttpResponse::InternalServerError().body(e), } } @@ -398,8 +398,8 @@ impl Router { async fn send_generate_request( &self, client: &reqwest::Client, - req: HttpRequest, - body: Bytes, + req: &HttpRequest, + body: &Bytes, route: &str, worker_url: &str, ) -> HttpResponse { @@ -484,8 +484,8 @@ impl Router { pub async fn route_generate_request( &self, client: &reqwest::Client, - req: HttpRequest, - body: Bytes, + req: &HttpRequest, + body: &Bytes, route: &str, ) -> HttpResponse { let worker_url = self.select_generate_worker(&body, route); @@ -493,7 +493,7 @@ impl Router { .await } - pub async fn add_worker(&self, worker_url: String) -> Result { + pub async fn add_worker(&self, worker_url: &str) -> Result { let interval_secs = 10; // check every 10 seconds let timeout_secs = 300; // 5 minutes @@ -517,11 +517,11 @@ impl Router { | Router::CacheAware { worker_urls, .. } => { info!("Worker {} health check passed", worker_url); let mut urls = worker_urls.write().unwrap(); - if urls.contains(&worker_url) { + if urls.contains(&worker_url.to_string()) { return Err(format!("Worker {} already exists", worker_url)); } info!("Added worker: {}", worker_url); - urls.push(worker_url.clone()); + urls.push(worker_url.to_string()); } } @@ -534,13 +534,16 @@ impl Router { } = self { // Add worker to running queue with initial count of 0 - running_queue.lock().unwrap().insert(worker_url.clone(), 0); + running_queue + .lock() + .unwrap() + .insert(worker_url.to_string(), 0); // Add worker to processed queue with initial count of 0 processed_queue .lock() .unwrap() - .insert(worker_url.clone(), 0); + .insert(worker_url.to_string(), 0); // Add worker to tree tree.lock().unwrap().insert(&"".to_string(), &worker_url); @@ -581,7 +584,7 @@ impl Router { } } - pub fn remove_worker(&self, worker_url: String) { + pub fn remove_worker(&self, worker_url: &str) { match self { Router::RoundRobin { worker_urls, .. } | Router::Random { worker_urls } @@ -602,8 +605,14 @@ impl Router { } = self { tree.lock().unwrap().remove_tenant(&worker_url); - running_queue.lock().unwrap().remove(&worker_url); - processed_queue.lock().unwrap().remove(&worker_url); + running_queue + .lock() + .unwrap() + .remove(&worker_url.to_string()); + processed_queue + .lock() + .unwrap() + .remove(&worker_url.to_string()); info!( "Removed worker from tree and cleaned up queues: {}", worker_url diff --git a/rust/src/server.rs b/rust/src/server.rs index 4b8328397..ded7bb565 100644 --- a/rust/src/server.rs +++ b/rust/src/server.rs @@ -63,7 +63,7 @@ async fn get_model_info(data: web::Data) -> impl Responder { #[post("/generate")] async fn generate(req: HttpRequest, body: Bytes, data: web::Data) -> impl Responder { data.router - .route_generate_request(&data.client, req, body, "/generate") + .route_generate_request(&data.client, &req, &body, "/generate") .await } @@ -74,7 +74,7 @@ async fn v1_chat_completions( data: web::Data, ) -> impl Responder { data.router - .route_generate_request(&data.client, req, body, "/v1/chat/completions") + .route_generate_request(&data.client, &req, &body, "/v1/chat/completions") .await } @@ -85,7 +85,7 @@ async fn v1_completions( data: web::Data, ) -> impl Responder { data.router - .route_generate_request(&data.client, req, body, "/v1/completions") + .route_generate_request(&data.client, &req, &body, "/v1/completions") .await } @@ -102,7 +102,7 @@ async fn add_worker( } }; - match data.router.add_worker(worker_url).await { + match data.router.add_worker(&worker_url).await { Ok(message) => HttpResponse::Ok().body(message), Err(error) => HttpResponse::BadRequest().body(error), } @@ -117,7 +117,7 @@ async fn remove_worker( Some(url) => url.to_string(), None => return HttpResponse::BadRequest().finish(), }; - data.router.remove_worker(worker_url); + data.router.remove_worker(&worker_url); HttpResponse::Ok().finish() }