[router] fix grpc connection conversion and add optimization (#11305)
This commit is contained in:
@@ -1,7 +1,7 @@
|
|||||||
use std::convert::TryFrom;
|
use std::convert::TryFrom;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use tonic::{transport::Channel, Request};
|
use tonic::{transport::Channel, Request};
|
||||||
use tracing::{debug, warn};
|
use tracing::debug;
|
||||||
|
|
||||||
use crate::protocols::spec::{
|
use crate::protocols::spec::{
|
||||||
ChatCompletionRequest, GenerateRequest, ResponseFormat,
|
ChatCompletionRequest, GenerateRequest, ResponseFormat,
|
||||||
@@ -27,28 +27,23 @@ impl SglangSchedulerClient {
|
|||||||
pub async fn connect(endpoint: &str) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
|
pub async fn connect(endpoint: &str) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
|
||||||
debug!("Connecting to SGLang scheduler at {}", endpoint);
|
debug!("Connecting to SGLang scheduler at {}", endpoint);
|
||||||
|
|
||||||
// Convert grpc:// to http:// for tonic, preserving IPv6 bracket notation
|
// Convert grpc:// to http:// for tonic
|
||||||
let http_endpoint = if endpoint.starts_with("grpc://") {
|
let http_endpoint = if let Some(addr) = endpoint.strip_prefix("grpc://") {
|
||||||
// Use proper URL parsing to preserve IPv6 brackets
|
format!("http://{}", addr)
|
||||||
match url::Url::parse(endpoint) {
|
|
||||||
Ok(mut parsed) => {
|
|
||||||
let _ = parsed.set_scheme("http");
|
|
||||||
parsed.to_string()
|
|
||||||
}
|
|
||||||
Err(_) => {
|
|
||||||
warn!(
|
|
||||||
"Failed to parse gRPC endpoint '{}', using simple string replacement",
|
|
||||||
endpoint
|
|
||||||
);
|
|
||||||
endpoint.replace("grpc://", "http://")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
endpoint.to_string()
|
endpoint.to_string()
|
||||||
};
|
};
|
||||||
|
|
||||||
let channel = Channel::from_shared(http_endpoint)?
|
let channel = Channel::from_shared(http_endpoint)?
|
||||||
.timeout(Duration::from_secs(30))
|
.timeout(Duration::from_secs(30))
|
||||||
|
.http2_keep_alive_interval(Duration::from_secs(30))
|
||||||
|
.keep_alive_timeout(Duration::from_secs(10))
|
||||||
|
.keep_alive_while_idle(true)
|
||||||
|
.tcp_keepalive(Some(Duration::from_secs(60)))
|
||||||
|
.tcp_nodelay(true)
|
||||||
|
.http2_adaptive_window(true)
|
||||||
|
.initial_stream_window_size(Some(16 * 1024 * 1024)) // 16MB
|
||||||
|
.initial_connection_window_size(Some(32 * 1024 * 1024)) // 32MB
|
||||||
.connect()
|
.connect()
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user