From 2eeb27515a8aa0957e4463f18d956f1624315ae2 Mon Sep 17 00:00:00 2001 From: Simo Lin Date: Fri, 10 Oct 2025 20:43:07 -0400 Subject: [PATCH] [router] disable rate limiter by default (#11435) --- .../py_src/sglang_router/router_args.py | 6 +-- sgl-router/src/config/types.rs | 8 ++-- sgl-router/src/lib.rs | 10 ++--- sgl-router/src/main.rs | 4 +- sgl-router/src/middleware.rs | 36 ++++++++++------ sgl-router/src/server.rs | 43 ++++++++++++++----- sgl-router/src/service_discovery.rs | 2 +- 7 files changed, 69 insertions(+), 40 deletions(-) diff --git a/sgl-router/py_src/sglang_router/router_args.py b/sgl-router/py_src/sglang_router/router_args.py index b451da440..f3f3b8391 100644 --- a/sgl-router/py_src/sglang_router/router_args.py +++ b/sgl-router/py_src/sglang_router/router_args.py @@ -54,8 +54,8 @@ class RouterArgs: request_id_headers: Optional[List[str]] = None # Request timeout in seconds request_timeout_secs: int = 1800 - # Max concurrent requests for rate limiting - max_concurrent_requests: int = 256 + # Max concurrent requests for rate limiting (-1 to disable) + max_concurrent_requests: int = -1 # Queue size for pending requests when max concurrent limit reached queue_size: int = 100 # Maximum time (in seconds) a request can wait in queue before timing out @@ -409,7 +409,7 @@ class RouterArgs: f"--{prefix}max-concurrent-requests", type=int, default=RouterArgs.max_concurrent_requests, - help="Maximum number of concurrent requests allowed (for rate limiting)", + help="Maximum number of concurrent requests allowed (for rate limiting). Set to -1 to disable rate limiting.", ) parser.add_argument( f"--{prefix}queue-size", diff --git a/sgl-router/src/config/types.rs b/sgl-router/src/config/types.rs index 84edd4c13..f55f14b79 100644 --- a/sgl-router/src/config/types.rs +++ b/sgl-router/src/config/types.rs @@ -38,14 +38,14 @@ pub struct RouterConfig { pub log_level: Option, /// Custom request ID headers to check (defaults to common headers) pub request_id_headers: Option>, - /// Maximum concurrent requests allowed (for rate limiting) - pub max_concurrent_requests: usize, + /// Maximum concurrent requests allowed (for rate limiting). Set to -1 to disable rate limiting. + pub max_concurrent_requests: i32, /// Queue size for pending requests when max concurrent limit reached (0 = no queue, return 429 immediately) pub queue_size: usize, /// Maximum time (in seconds) a request can wait in queue before timing out pub queue_timeout_secs: u64, /// Token bucket refill rate (tokens per second). If not set, defaults to max_concurrent_requests - pub rate_limit_tokens_per_second: Option, + pub rate_limit_tokens_per_second: Option, /// CORS allowed origins pub cors_allowed_origins: Vec, /// Retry configuration @@ -436,7 +436,7 @@ impl Default for RouterConfig { log_dir: None, log_level: None, request_id_headers: None, - max_concurrent_requests: 256, + max_concurrent_requests: -1, queue_size: 100, queue_timeout_secs: 60, rate_limit_tokens_per_second: None, diff --git a/sgl-router/src/lib.rs b/sgl-router/src/lib.rs index 3981ab68e..01b037ed1 100644 --- a/sgl-router/src/lib.rs +++ b/sgl-router/src/lib.rs @@ -65,7 +65,7 @@ struct Router { decode_urls: Option>, prefill_policy: Option, decode_policy: Option, - max_concurrent_requests: usize, + max_concurrent_requests: i32, cors_allowed_origins: Vec, retry_max_retries: u32, retry_initial_backoff_ms: u64, @@ -86,7 +86,7 @@ struct Router { enable_igw: bool, queue_size: usize, queue_timeout_secs: u64, - rate_limit_tokens_per_second: Option, + rate_limit_tokens_per_second: Option, connection_mode: config::ConnectionMode, model_path: Option, tokenizer_path: Option, @@ -260,7 +260,7 @@ impl Router { decode_urls = None, prefill_policy = None, decode_policy = None, - max_concurrent_requests = 256, + max_concurrent_requests = -1, cors_allowed_origins = vec![], retry_max_retries = 5, retry_initial_backoff_ms = 50, @@ -321,7 +321,7 @@ impl Router { decode_urls: Option>, prefill_policy: Option, decode_policy: Option, - max_concurrent_requests: usize, + max_concurrent_requests: i32, cors_allowed_origins: Vec, retry_max_retries: u32, retry_initial_backoff_ms: u64, @@ -342,7 +342,7 @@ impl Router { enable_igw: bool, queue_size: usize, queue_timeout_secs: u64, - rate_limit_tokens_per_second: Option, + rate_limit_tokens_per_second: Option, model_path: Option, tokenizer_path: Option, reasoning_parser: Option, diff --git a/sgl-router/src/main.rs b/sgl-router/src/main.rs index 0a0d738f8..60f422f5d 100644 --- a/sgl-router/src/main.rs +++ b/sgl-router/src/main.rs @@ -192,8 +192,8 @@ struct CliArgs { #[arg(long, default_value_t = 1800)] request_timeout_secs: u64, - #[arg(long, default_value_t = 256)] - max_concurrent_requests: usize, + #[arg(long, default_value_t = -1)] + max_concurrent_requests: i32, #[arg(long, num_args = 0..)] cors_allowed_origins: Vec, diff --git a/sgl-router/src/middleware.rs b/sgl-router/src/middleware.rs index 0c1f3098c..6e6344900 100644 --- a/sgl-router/src/middleware.rs +++ b/sgl-router/src/middleware.rs @@ -424,22 +424,23 @@ pub struct ConcurrencyLimiter { impl ConcurrencyLimiter { /// Create new concurrency limiter with optional queue pub fn new( - token_bucket: Arc, + token_bucket: Option>, queue_size: usize, queue_timeout: Duration, ) -> (Self, Option) { - if queue_size > 0 { - let (queue_tx, queue_rx) = mpsc::channel(queue_size); - let processor = QueueProcessor::new(token_bucket, queue_rx, queue_timeout); - - ( - Self { - queue_tx: Some(queue_tx), - }, - Some(processor), - ) - } else { - (Self { queue_tx: None }, None) + match (token_bucket, queue_size) { + (None, _) => (Self { queue_tx: None }, None), + (Some(bucket), size) if size > 0 => { + let (queue_tx, queue_rx) = mpsc::channel(size); + let processor = QueueProcessor::new(bucket, queue_rx, queue_timeout); + ( + Self { + queue_tx: Some(queue_tx), + }, + Some(processor), + ) + } + (Some(_), _) => (Self { queue_tx: None }, None), } } } @@ -450,12 +451,19 @@ pub async fn concurrency_limit_middleware( request: Request, next: Next, ) -> Response { + let token_bucket = match &app_state.context.rate_limiter { + Some(bucket) => bucket.clone(), + None => { + // Rate limiting disabled, pass through immediately + return next.run(request).await; + } + }; + // Static counter for embeddings queue size static EMBEDDINGS_QUEUE_SIZE: AtomicU64 = AtomicU64::new(0); // Identify if this is an embeddings request based on path let is_embeddings = request.uri().path().contains("/v1/embeddings"); - let token_bucket = app_state.context.rate_limiter.clone(); // Try to acquire token immediately if token_bucket.try_acquire(1.0).await.is_ok() { diff --git a/sgl-router/src/server.rs b/sgl-router/src/server.rs index 74d4158af..dacd88a5d 100644 --- a/sgl-router/src/server.rs +++ b/sgl-router/src/server.rs @@ -48,7 +48,7 @@ use tracing::{error, info, warn, Level}; pub struct AppContext { pub client: Client, pub router_config: RouterConfig, - pub rate_limiter: Arc, + pub rate_limiter: Option>, pub tokenizer: Option>, pub reasoning_parser_factory: Option, pub tool_parser_factory: Option, @@ -67,11 +67,20 @@ impl AppContext { pub fn new( router_config: RouterConfig, client: Client, - max_concurrent_requests: usize, - rate_limit_tokens_per_second: Option, + max_concurrent_requests: i32, + rate_limit_tokens_per_second: Option, ) -> Result { - let rate_limit_tokens = rate_limit_tokens_per_second.unwrap_or(max_concurrent_requests); - let rate_limiter = Arc::new(TokenBucket::new(max_concurrent_requests, rate_limit_tokens)); + let rate_limiter = match max_concurrent_requests { + n if n <= 0 => None, + n => { + let rate_limit_tokens = + rate_limit_tokens_per_second.filter(|&t| t > 0).unwrap_or(n); + Some(Arc::new(TokenBucket::new( + n as usize, + rate_limit_tokens as usize, + ))) + } + }; let (tokenizer, reasoning_parser_factory, tool_parser_factory) = if router_config.connection_mode == ConnectionMode::Grpc { @@ -916,12 +925,24 @@ pub async fn startup(config: ServerConfig) -> Result<(), Box { + spawn(proc.run()); + info!( + "Started request queue (size: {}, timeout: {}s)", + config.router_config.queue_size, config.router_config.queue_timeout_secs + ); + } + None => { + info!( + "Rate limiting enabled (max_concurrent_requests = {}, queue disabled)", + config.router_config.max_concurrent_requests + ); + } } let app_state = Arc::new(AppState { diff --git a/sgl-router/src/service_discovery.rs b/sgl-router/src/service_discovery.rs index 00746bf1d..381df39fe 100644 --- a/sgl-router/src/service_discovery.rs +++ b/sgl-router/src/service_discovery.rs @@ -532,7 +532,7 @@ mod tests { Arc::new(AppContext { client: reqwest::Client::new(), router_config: router_config.clone(), - rate_limiter: Arc::new(TokenBucket::new(1000, 1000)), + rate_limiter: Some(Arc::new(TokenBucket::new(1000, 1000))), worker_registry: Arc::new(crate::core::WorkerRegistry::new()), policy_registry: Arc::new(crate::policies::PolicyRegistry::new( router_config.policy.clone(),