diff --git a/python/sglang/srt/entrypoints/grpc_request_manager.py b/python/sglang/srt/entrypoints/grpc_request_manager.py index 7bb232ca4..c719f7c45 100644 --- a/python/sglang/srt/entrypoints/grpc_request_manager.py +++ b/python/sglang/srt/entrypoints/grpc_request_manager.py @@ -397,9 +397,7 @@ class GrpcRequestManager: # Wait for result in background async def wait_for_result(): try: - # Wait for completion await state.event.wait() - # Get result from queue result = await state.out_queue.get() future.set_result(result) except Exception as e: @@ -437,19 +435,6 @@ class GrpcRequestManager: return True - async def pause_generation(self): - """Pause generation processing.""" - async with self.is_pause_cond: - self.is_pause = True - logger.info("Generation paused") - - async def resume_generation(self): - """Resume generation processing.""" - async with self.is_pause_cond: - self.is_pause = False - self.is_pause_cond.notify_all() - logger.info("Generation resumed") - async def handle_loop(self): """ Main event loop - processes outputs from scheduler. diff --git a/python/sglang/srt/entrypoints/grpc_server.py b/python/sglang/srt/entrypoints/grpc_server.py index fea12300d..8a090da90 100644 --- a/python/sglang/srt/entrypoints/grpc_server.py +++ b/python/sglang/srt/entrypoints/grpc_server.py @@ -189,7 +189,7 @@ class SGLangSchedulerServicer(sglang_scheduler_pb2_grpc.SglangSchedulerServicer) # Start the request manager's event loop using auto_create_handle_loop self.request_manager.auto_create_handle_loop() - logger.info("Standalone gRPC scheduler service initialized") + logger.info("gRPC scheduler servicer initialized") async def Generate( self, @@ -197,7 +197,7 @@ class SGLangSchedulerServicer(sglang_scheduler_pb2_grpc.SglangSchedulerServicer) context: grpc.aio.ServicerContext, ) -> AsyncIterator[sglang_scheduler_pb2.GenerateResponse]: """Handle generation requests with streaming responses.""" - logger.info(f"Generation request: {request.request_id}") + logger.debug(f"Receive generation request: {request.request_id}") try: # Convert gRPC request to internal format @@ -249,7 +249,10 @@ class SGLangSchedulerServicer(sglang_scheduler_pb2_grpc.SglangSchedulerServicer) yield self._create_chunk_response(request.request_id, output) except Exception as e: - logger.error(f"Generate failed: {e}\n{get_exception_traceback()}") + logger.error( + f"Generate failed for request {request.request_id}: {e}\n" + f"{get_exception_traceback()}" + ) yield sglang_scheduler_pb2.GenerateResponse( request_id=request.request_id, error=sglang_scheduler_pb2.GenerateError( @@ -262,10 +265,10 @@ class SGLangSchedulerServicer(sglang_scheduler_pb2_grpc.SglangSchedulerServicer) async def Embed( self, request: sglang_scheduler_pb2.EmbedRequest, - context: grpc.aio.ServicerContext, + _context: grpc.aio.ServicerContext, ) -> sglang_scheduler_pb2.EmbedResponse: """Handle embedding requests.""" - logger.info(f"Embedding request: {request.request_id}") + logger.debug(f"Receive embedding request: {request.request_id}") try: # Convert request @@ -292,7 +295,10 @@ class SGLangSchedulerServicer(sglang_scheduler_pb2_grpc.SglangSchedulerServicer) ) except Exception as e: - logger.error(f"Embed failed: {e}\n{get_exception_traceback()}") + logger.error( + f"Embed failed for request {request.request_id}: {e}\n" + f"{get_exception_traceback()}" + ) return sglang_scheduler_pb2.EmbedResponse( request_id=request.request_id, error=sglang_scheduler_pb2.EmbedError( @@ -344,7 +350,7 @@ class SGLangSchedulerServicer(sglang_scheduler_pb2_grpc.SglangSchedulerServicer) health_request.bootstrap_host = FAKE_BOOTSTRAP_HOST health_request.bootstrap_room = 0 - logger.info(f"Sending health check request to request manager...") + logger.debug(f"Receive health check request: {rid}") # Submit and wait for response output_generator = self.request_manager.generate_request( @@ -375,7 +381,7 @@ class SGLangSchedulerServicer(sglang_scheduler_pb2_grpc.SglangSchedulerServicer) ) except Exception as e: - logger.error(f"Health check failed: {e}") + logger.error(f"Health check failed: {e}\n{get_exception_traceback()}") return sglang_scheduler_pb2.HealthCheckResponse( healthy=False, message=f"Health check error: {str(e)}" ) @@ -383,10 +389,10 @@ class SGLangSchedulerServicer(sglang_scheduler_pb2_grpc.SglangSchedulerServicer) async def Abort( self, request: sglang_scheduler_pb2.AbortRequest, - context: grpc.aio.ServicerContext, + _context: grpc.aio.ServicerContext, ) -> sglang_scheduler_pb2.AbortResponse: """Abort an ongoing request.""" - logger.info(f"Aborting request: {request.request_id}") + logger.debug(f"Receive abort request: {request.request_id}") try: success = await self.request_manager.abort_request(request.request_id) @@ -396,7 +402,10 @@ class SGLangSchedulerServicer(sglang_scheduler_pb2_grpc.SglangSchedulerServicer) message=f"Request {request.request_id} {'aborted' if success else 'not found'}", ) except Exception as e: - logger.error(f"Abort failed: {e}") + logger.error( + f"Abort failed for request {request.request_id}: {e}\n" + f"{get_exception_traceback()}" + ) return sglang_scheduler_pb2.AbortResponse( success=False, message=str(e), @@ -404,11 +413,11 @@ class SGLangSchedulerServicer(sglang_scheduler_pb2_grpc.SglangSchedulerServicer) async def GetModelInfo( self, - request: sglang_scheduler_pb2.GetModelInfoRequest, - context: grpc.aio.ServicerContext, + _request: sglang_scheduler_pb2.GetModelInfoRequest, + _context: grpc.aio.ServicerContext, ) -> sglang_scheduler_pb2.GetModelInfoResponse: """Get model information.""" - logger.info("Model info request received") + logger.debug("Receive model info request") is_generation = self.scheduler_info.get("is_generation") if is_generation is None: @@ -435,11 +444,11 @@ class SGLangSchedulerServicer(sglang_scheduler_pb2_grpc.SglangSchedulerServicer) async def GetServerInfo( self, - request: sglang_scheduler_pb2.GetServerInfoRequest, - context: grpc.aio.ServicerContext, + _request: sglang_scheduler_pb2.GetServerInfoRequest, + _context: grpc.aio.ServicerContext, ) -> sglang_scheduler_pb2.GetServerInfoResponse: """Get server information.""" - logger.info("Server info request received") + logger.debug("Receive server info request") server_args_dict = dataclasses.asdict(self.server_args) server_args_struct = Struct() @@ -861,9 +870,8 @@ async def serve_grpc( listen_addr = f"{server_args.host}:{server_args.port}" server.add_insecure_port(listen_addr) - logger.info(f"Starting standalone gRPC server on {listen_addr}") - await server.start() + logger.info(f"gRPC server listening on {listen_addr}") # Handle shutdown signals loop = asyncio.get_running_loop() diff --git a/sgl-router/src/reasoning_parser/parsers/base.rs b/sgl-router/src/reasoning_parser/parsers/base.rs index 2aa8846a9..99e94c8cb 100644 --- a/sgl-router/src/reasoning_parser/parsers/base.rs +++ b/sgl-router/src/reasoning_parser/parsers/base.rs @@ -2,7 +2,6 @@ // for detecting and extracting reasoning blocks from text. use crate::reasoning_parser::traits::{ParseError, ParserConfig, ParserResult, ReasoningParser}; -use tracing as log; /// Base reasoning parser implementation. /// @@ -46,18 +45,14 @@ impl BaseReasoningParser { impl ReasoningParser for BaseReasoningParser { fn detect_and_parse_reasoning(&mut self, text: &str) -> Result { - log::debug!("detect_and_parse_reasoning called with text: {:?}", text); - // Check input size against buffer limit if text.len() > self.config.max_buffer_size { return Err(ParseError::BufferOverflow(text.len())); } let in_reasoning = self.in_reasoning || text.contains(&self.config.think_start_token); - log::debug!("in_reasoning: {}", in_reasoning); if !in_reasoning { - log::debug!("No reasoning detected, returning normal text."); return Ok(ParserResult::normal(text.to_string())); } @@ -66,15 +61,8 @@ impl ReasoningParser for BaseReasoningParser { .replace(&self.config.think_start_token, "") .trim() .to_string(); - log::debug!( - "Processed text after removing think_start_token: {:?}", - processed_text - ); if !processed_text.contains(&self.config.think_end_token) { - log::debug!( - "Reasoning truncated, think_end_token not found. Returning reasoning text." - ); // Assume reasoning was truncated before end token return Ok(ParserResult::reasoning(processed_text)); } @@ -89,9 +77,6 @@ impl ReasoningParser for BaseReasoningParser { .map(|s| s.trim().to_string()) .unwrap_or_default(); - log::debug!("Extracted reasoning_text: {:?}", reasoning_text); - log::debug!("Extracted normal_text: {:?}", normal_text); - Ok(ParserResult::new(normal_text, reasoning_text)) } @@ -108,19 +93,6 @@ impl ReasoningParser for BaseReasoningParser { self.buffer.push_str(text); let mut current_text = self.buffer.clone(); - log::debug!( - "parse_reasoning_streaming_incremental called with text: {:?}", - text - ); - log::debug!("current buffer: {:?}", self.buffer); - log::debug!("current_text: {:?}", current_text); - log::debug!( - "in_reasoning: {}, stripped_think_start: {}, stream_reasoning: {}", - self.in_reasoning, - self.stripped_think_start, - self.config.stream_reasoning - ); - // If the current text is a prefix of a token, keep buffering if self.is_partial_token(¤t_text) { return Ok(ParserResult::default()); diff --git a/sgl-router/src/routers/grpc/pipeline.rs b/sgl-router/src/routers/grpc/pipeline.rs index 17e88bd8a..7f8ed2387 100644 --- a/sgl-router/src/routers/grpc/pipeline.rs +++ b/sgl-router/src/routers/grpc/pipeline.rs @@ -56,8 +56,6 @@ pub struct PreparationStage; #[async_trait] impl PipelineStage for PreparationStage { async fn execute(&self, ctx: &mut RequestContext) -> Result, Response> { - debug!("Stage {}: Processing request", self.name()); - // Clone Arc before match to avoid borrow checker issues // (matching borrows ctx, but prepare_* methods need mutable borrow) // Arc clone is cheap (8 bytes) - avoids full request clone (15KB-200KB) @@ -109,7 +107,6 @@ impl PreparationStage { }; let token_ids = encoding.token_ids().to_vec(); - debug!("Tokenized {} tokens from input", token_ids.len()); // Step 4: Build tool constraints if needed let tool_call_constraint = body_ref.tools.as_ref().and_then(|tools| { @@ -157,8 +154,6 @@ impl PreparationStage { } }; - debug!("Resolved input with {} tokens", token_ids.len()); - // Create stop sequence decoder for generate requests let params = request.sampling_params.as_ref(); let stop_decoder = utils::create_stop_decoder( @@ -259,8 +254,6 @@ impl WorkerSelectionStage { #[async_trait] impl PipelineStage for WorkerSelectionStage { async fn execute(&self, ctx: &mut RequestContext) -> Result, Response> { - debug!("Stage {}: Selecting workers", self.name()); - let prep = ctx .state .preparation @@ -414,8 +407,6 @@ pub struct ClientAcquisitionStage; #[async_trait] impl PipelineStage for ClientAcquisitionStage { async fn execute(&self, ctx: &mut RequestContext) -> Result, Response> { - debug!("Stage {}: Acquiring gRPC clients", self.name()); - let workers = ctx .state .workers @@ -464,8 +455,6 @@ impl RequestBuildingStage { #[async_trait] impl PipelineStage for RequestBuildingStage { async fn execute(&self, ctx: &mut RequestContext) -> Result, Response> { - debug!("Stage {}: Building proto request", self.name()); - let prep = ctx .state .preparation @@ -578,8 +567,6 @@ pub struct DispatchMetadataStage; #[async_trait] impl PipelineStage for DispatchMetadataStage { async fn execute(&self, ctx: &mut RequestContext) -> Result, Response> { - debug!("Stage {}: Preparing dispatch metadata", self.name()); - let proto_request = ctx .state .proto_request @@ -656,8 +643,6 @@ impl RequestExecutionStage { #[async_trait] impl PipelineStage for RequestExecutionStage { async fn execute(&self, ctx: &mut RequestContext) -> Result, Response> { - debug!("Stage {}: Executing gRPC request", self.name()); - let proto_request = ctx .state .proto_request @@ -713,8 +698,6 @@ impl RequestExecutionStage { .dual_mut() .ok_or_else(|| utils::internal_error_static("Expected dual clients but got single"))?; - debug!("Sending concurrent requests to prefill and decode workers"); - let prefill_request = proto_request.clone(); let decode_request = proto_request; @@ -780,8 +763,6 @@ impl ResponseProcessingStage { #[async_trait] impl PipelineStage for ResponseProcessingStage { async fn execute(&self, ctx: &mut RequestContext) -> Result, Response> { - debug!("Stage {}: Processing response", self.name()); - // Delegate to request-type specific processing match &ctx.input.request_type { RequestType::Chat(_) => return self.process_chat_response(ctx).await, @@ -1199,15 +1180,9 @@ impl ChatCompletionPipeline { // Execute each stage in sequence for (idx, stage) in self.stages.iter().enumerate() { - debug!("Executing stage {}: {}", idx + 1, stage.name()); match stage.execute(&mut ctx).await { Ok(Some(response)) => { // Stage completed successfully with a response (e.g., streaming) - debug!( - "Stage {} ({}) completed with response", - idx + 1, - stage.name() - ); return response; } Ok(None) => { @@ -1249,15 +1224,9 @@ impl ChatCompletionPipeline { // Execute each stage in sequence for (idx, stage) in self.stages.iter().enumerate() { - debug!("Executing stage {}: {}", idx + 1, stage.name()); match stage.execute(&mut ctx).await { Ok(Some(response)) => { // Stage completed successfully with a response (e.g., streaming) - debug!( - "Stage {} ({}) completed with response", - idx + 1, - stage.name() - ); return response; } Ok(None) => { diff --git a/sgl-router/src/routers/grpc/utils.rs b/sgl-router/src/routers/grpc/utils.rs index cc05cb32d..8b7490a45 100644 --- a/sgl-router/src/routers/grpc/utils.rs +++ b/sgl-router/src/routers/grpc/utils.rs @@ -21,7 +21,7 @@ use serde_json::{json, Map, Value}; use std::collections::HashMap; use std::sync::Arc; use tonic::codec::Streaming; -use tracing::{debug, error, warn}; +use tracing::{error, warn}; use uuid::Uuid; /// Get gRPC client from worker, returning appropriate error response on failure @@ -602,10 +602,6 @@ pub async fn collect_stream_responses( Ok(gen_response) => { match gen_response.response { Some(Complete(complete)) => { - debug!( - "{} completed: prompt_tokens={}, completion_tokens={}, finish_reason={}", - worker_name, complete.prompt_tokens, complete.completion_tokens, complete.finish_reason - ); all_responses.push(complete); } Some(Error(err)) => { @@ -615,11 +611,11 @@ pub async fn collect_stream_responses( worker_name, err.message ))); } - Some(Chunk(chunk)) => { - debug!("{} chunk: {} tokens", worker_name, chunk.token_ids.len()); + Some(Chunk(_chunk)) => { + // Streaming chunk - no action needed } None => { - debug!("{}: empty response", worker_name); + // Empty response - no action needed } } } @@ -633,7 +629,6 @@ pub async fn collect_stream_responses( } } - debug!("{} stream closed", worker_name); Ok(all_responses) }