diff --git a/python/sglang/srt/managers/schedule_batch.py b/python/sglang/srt/managers/schedule_batch.py index 768a9176c..bab64bae1 100644 --- a/python/sglang/srt/managers/schedule_batch.py +++ b/python/sglang/srt/managers/schedule_batch.py @@ -474,6 +474,10 @@ class Req: self.temp_scaled_logprobs = False self.top_p_normalized_logprobs = False + # Latency Breakdown + self.queue_time_start = None + self.queue_time_end = None + # Logprobs (return values) self.input_token_logprobs_val: Optional[List[float]] = None self.input_token_logprobs_idx: Optional[List[int]] = None diff --git a/python/sglang/srt/managers/scheduler.py b/python/sglang/srt/managers/scheduler.py index 8b0573e50..cf3de55cc 100644 --- a/python/sglang/srt/managers/scheduler.py +++ b/python/sglang/srt/managers/scheduler.py @@ -838,6 +838,7 @@ class Scheduler( eos_token_ids=self.model_config.hf_eos_token_id, ) req.tokenizer = self.tokenizer + req.queue_time_start = time.time() if ( recv_req.session_params is not None @@ -852,6 +853,7 @@ class Scheduler( # Create a new request from a previous session session = self.sessions[recv_req.session_params.id] req = session.create_req(recv_req, self.tokenizer) + req.queue_time_start = time.time() if isinstance(req.finished_reason, FINISH_ABORT): self._add_request_to_queue(req) return @@ -995,6 +997,7 @@ class Scheduler( req.finished_reason = FINISH_ABORT( error_msg, HTTPStatus.BAD_REQUEST, "BadRequestError" ) + req.queue_time_start = time.time() self.waiting_queue.append(req) return @@ -1031,9 +1034,10 @@ class Scheduler( self._largest_prefill_len, adder.log_input_tokens ) + num_new_seq = len(can_run_list) f = ( f"Prefill batch. " - f"#new-seq: {len(can_run_list)}, " + f"#new-seq: {num_new_seq}, " f"#new-token: {adder.log_input_tokens}, " f"#cached-token: {adder.log_hit_tokens}, " f"token usage: {num_used / self.max_total_num_tokens:.2f}, " @@ -1051,6 +1055,12 @@ class Scheduler( self.stats.token_usage = round(num_used / self.max_total_num_tokens, 2) self.stats.num_queue_reqs = len(self.waiting_queue) self.stats.cache_hit_rate = cache_hit_rate + + total_queue_latency = 0 + for req in can_run_list: + total_queue_latency += req.queue_time_end - req.queue_time_start + self.stats.avg_request_queue_latency = total_queue_latency / num_new_seq + self.metrics_collector.log_stats(self.stats) def log_decode_stats(self): @@ -1287,6 +1297,12 @@ class Scheduler( can_run_list: List[Req] = adder.can_run_list if len(can_run_list) == 0: return None + + if self.enable_metrics: + # only record queue time when enable_metrics is True to avoid overhead + for req in can_run_list: + req.queue_time_end = time.time() + self.waiting_queue = [ x for x in self.waiting_queue if x not in set(can_run_list) ] diff --git a/python/sglang/srt/metrics/collector.py b/python/sglang/srt/metrics/collector.py index 3694490f2..b881406e6 100644 --- a/python/sglang/srt/metrics/collector.py +++ b/python/sglang/srt/metrics/collector.py @@ -27,6 +27,7 @@ class SchedulerStats: num_queue_reqs: int = 0 cache_hit_rate: float = 0.0 spec_accept_length: float = 0.0 + avg_request_queue_latency: float = 0.0 class SchedulerMetricsCollector: @@ -87,6 +88,13 @@ class SchedulerMetricsCollector: multiprocess_mode="mostrecent", ) + self.avg_request_queue_latency = Gauge( + name="sglang:avg_request_queue_latency", + documentation="The average request queue latency for the last batch of requests in seconds.", + labelnames=labels.keys(), + multiprocess_mode="mostrecent", + ) + def _log_gauge(self, gauge, data: Union[int, float]) -> None: # Convenience function for logging to gauge. gauge.labels(**self.labels).set(data) @@ -99,6 +107,7 @@ class SchedulerMetricsCollector: self._log_gauge(self.num_queue_reqs, stats.num_queue_reqs) self._log_gauge(self.cache_hit_rate, stats.cache_hit_rate) self._log_gauge(self.spec_accept_length, stats.spec_accept_length) + self._log_gauge(self.avg_request_queue_latency, stats.avg_request_queue_latency) self.last_log_time = time.time()