[router] Use borrow if possible to save cost (#2441)
This commit is contained in:
@@ -252,7 +252,7 @@ impl Router {
|
|||||||
async fn send_request(
|
async fn send_request(
|
||||||
&self,
|
&self,
|
||||||
client: &reqwest::Client,
|
client: &reqwest::Client,
|
||||||
worker_url: String,
|
worker_url: &str,
|
||||||
route: &str,
|
route: &str,
|
||||||
) -> HttpResponse {
|
) -> HttpResponse {
|
||||||
match client.get(format!("{}{}", worker_url, route)).send().await {
|
match client.get(format!("{}{}", worker_url, route)).send().await {
|
||||||
@@ -275,7 +275,7 @@ impl Router {
|
|||||||
|
|
||||||
pub async fn route_to_first(&self, client: &reqwest::Client, route: &str) -> HttpResponse {
|
pub async fn route_to_first(&self, client: &reqwest::Client, route: &str) -> HttpResponse {
|
||||||
match self.select_first_worker() {
|
match self.select_first_worker() {
|
||||||
Ok(worker_url) => self.send_request(client, worker_url, route).await,
|
Ok(worker_url) => self.send_request(client, &worker_url, route).await,
|
||||||
Err(e) => HttpResponse::InternalServerError().body(e),
|
Err(e) => HttpResponse::InternalServerError().body(e),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -398,8 +398,8 @@ impl Router {
|
|||||||
async fn send_generate_request(
|
async fn send_generate_request(
|
||||||
&self,
|
&self,
|
||||||
client: &reqwest::Client,
|
client: &reqwest::Client,
|
||||||
req: HttpRequest,
|
req: &HttpRequest,
|
||||||
body: Bytes,
|
body: &Bytes,
|
||||||
route: &str,
|
route: &str,
|
||||||
worker_url: &str,
|
worker_url: &str,
|
||||||
) -> HttpResponse {
|
) -> HttpResponse {
|
||||||
@@ -484,8 +484,8 @@ impl Router {
|
|||||||
pub async fn route_generate_request(
|
pub async fn route_generate_request(
|
||||||
&self,
|
&self,
|
||||||
client: &reqwest::Client,
|
client: &reqwest::Client,
|
||||||
req: HttpRequest,
|
req: &HttpRequest,
|
||||||
body: Bytes,
|
body: &Bytes,
|
||||||
route: &str,
|
route: &str,
|
||||||
) -> HttpResponse {
|
) -> HttpResponse {
|
||||||
let worker_url = self.select_generate_worker(&body, route);
|
let worker_url = self.select_generate_worker(&body, route);
|
||||||
@@ -493,7 +493,7 @@ impl Router {
|
|||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn add_worker(&self, worker_url: String) -> Result<String, String> {
|
pub async fn add_worker(&self, worker_url: &str) -> Result<String, String> {
|
||||||
let interval_secs = 10; // check every 10 seconds
|
let interval_secs = 10; // check every 10 seconds
|
||||||
let timeout_secs = 300; // 5 minutes
|
let timeout_secs = 300; // 5 minutes
|
||||||
|
|
||||||
@@ -517,11 +517,11 @@ impl Router {
|
|||||||
| Router::CacheAware { worker_urls, .. } => {
|
| Router::CacheAware { worker_urls, .. } => {
|
||||||
info!("Worker {} health check passed", worker_url);
|
info!("Worker {} health check passed", worker_url);
|
||||||
let mut urls = worker_urls.write().unwrap();
|
let mut urls = worker_urls.write().unwrap();
|
||||||
if urls.contains(&worker_url) {
|
if urls.contains(&worker_url.to_string()) {
|
||||||
return Err(format!("Worker {} already exists", worker_url));
|
return Err(format!("Worker {} already exists", worker_url));
|
||||||
}
|
}
|
||||||
info!("Added worker: {}", worker_url);
|
info!("Added worker: {}", worker_url);
|
||||||
urls.push(worker_url.clone());
|
urls.push(worker_url.to_string());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -534,13 +534,16 @@ impl Router {
|
|||||||
} = self
|
} = self
|
||||||
{
|
{
|
||||||
// Add worker to running queue with initial count of 0
|
// Add worker to running queue with initial count of 0
|
||||||
running_queue.lock().unwrap().insert(worker_url.clone(), 0);
|
running_queue
|
||||||
|
.lock()
|
||||||
|
.unwrap()
|
||||||
|
.insert(worker_url.to_string(), 0);
|
||||||
|
|
||||||
// Add worker to processed queue with initial count of 0
|
// Add worker to processed queue with initial count of 0
|
||||||
processed_queue
|
processed_queue
|
||||||
.lock()
|
.lock()
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.insert(worker_url.clone(), 0);
|
.insert(worker_url.to_string(), 0);
|
||||||
|
|
||||||
// Add worker to tree
|
// Add worker to tree
|
||||||
tree.lock().unwrap().insert(&"".to_string(), &worker_url);
|
tree.lock().unwrap().insert(&"".to_string(), &worker_url);
|
||||||
@@ -581,7 +584,7 @@ impl Router {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn remove_worker(&self, worker_url: String) {
|
pub fn remove_worker(&self, worker_url: &str) {
|
||||||
match self {
|
match self {
|
||||||
Router::RoundRobin { worker_urls, .. }
|
Router::RoundRobin { worker_urls, .. }
|
||||||
| Router::Random { worker_urls }
|
| Router::Random { worker_urls }
|
||||||
@@ -602,8 +605,14 @@ impl Router {
|
|||||||
} = self
|
} = self
|
||||||
{
|
{
|
||||||
tree.lock().unwrap().remove_tenant(&worker_url);
|
tree.lock().unwrap().remove_tenant(&worker_url);
|
||||||
running_queue.lock().unwrap().remove(&worker_url);
|
running_queue
|
||||||
processed_queue.lock().unwrap().remove(&worker_url);
|
.lock()
|
||||||
|
.unwrap()
|
||||||
|
.remove(&worker_url.to_string());
|
||||||
|
processed_queue
|
||||||
|
.lock()
|
||||||
|
.unwrap()
|
||||||
|
.remove(&worker_url.to_string());
|
||||||
info!(
|
info!(
|
||||||
"Removed worker from tree and cleaned up queues: {}",
|
"Removed worker from tree and cleaned up queues: {}",
|
||||||
worker_url
|
worker_url
|
||||||
|
|||||||
@@ -63,7 +63,7 @@ async fn get_model_info(data: web::Data<AppState>) -> impl Responder {
|
|||||||
#[post("/generate")]
|
#[post("/generate")]
|
||||||
async fn generate(req: HttpRequest, body: Bytes, data: web::Data<AppState>) -> impl Responder {
|
async fn generate(req: HttpRequest, body: Bytes, data: web::Data<AppState>) -> impl Responder {
|
||||||
data.router
|
data.router
|
||||||
.route_generate_request(&data.client, req, body, "/generate")
|
.route_generate_request(&data.client, &req, &body, "/generate")
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -74,7 +74,7 @@ async fn v1_chat_completions(
|
|||||||
data: web::Data<AppState>,
|
data: web::Data<AppState>,
|
||||||
) -> impl Responder {
|
) -> impl Responder {
|
||||||
data.router
|
data.router
|
||||||
.route_generate_request(&data.client, req, body, "/v1/chat/completions")
|
.route_generate_request(&data.client, &req, &body, "/v1/chat/completions")
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -85,7 +85,7 @@ async fn v1_completions(
|
|||||||
data: web::Data<AppState>,
|
data: web::Data<AppState>,
|
||||||
) -> impl Responder {
|
) -> impl Responder {
|
||||||
data.router
|
data.router
|
||||||
.route_generate_request(&data.client, req, body, "/v1/completions")
|
.route_generate_request(&data.client, &req, &body, "/v1/completions")
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -102,7 +102,7 @@ async fn add_worker(
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
match data.router.add_worker(worker_url).await {
|
match data.router.add_worker(&worker_url).await {
|
||||||
Ok(message) => HttpResponse::Ok().body(message),
|
Ok(message) => HttpResponse::Ok().body(message),
|
||||||
Err(error) => HttpResponse::BadRequest().body(error),
|
Err(error) => HttpResponse::BadRequest().body(error),
|
||||||
}
|
}
|
||||||
@@ -117,7 +117,7 @@ async fn remove_worker(
|
|||||||
Some(url) => url.to_string(),
|
Some(url) => url.to_string(),
|
||||||
None => return HttpResponse::BadRequest().finish(),
|
None => return HttpResponse::BadRequest().finish(),
|
||||||
};
|
};
|
||||||
data.router.remove_worker(worker_url);
|
data.router.remove_worker(&worker_url);
|
||||||
HttpResponse::Ok().finish()
|
HttpResponse::Ok().finish()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user