From 795668dc73eecc09907b7f25161c53b0bdc3cc43 Mon Sep 17 00:00:00 2001 From: Yingchun Lai Date: Thu, 17 Jul 2025 08:55:59 +0800 Subject: [PATCH] feat: add tp_rank, pp_rank and dp_rank labels for scheduler metrics (#7597) Co-authored-by: Stefan He --- python/sglang/srt/managers/scheduler.py | 38 ++++++++++++------- .../scheduler_output_processor_mixin.py | 2 +- python/sglang/srt/server_args.py | 8 ++++ 3 files changed, 33 insertions(+), 15 deletions(-) diff --git a/python/sglang/srt/managers/scheduler.py b/python/sglang/srt/managers/scheduler.py index a7f893253..ab966f924 100644 --- a/python/sglang/srt/managers/scheduler.py +++ b/python/sglang/srt/managers/scheduler.py @@ -252,6 +252,9 @@ class Scheduler( self.enable_overlap = not server_args.disable_overlap_schedule self.skip_tokenizer_init = server_args.skip_tokenizer_init self.enable_metrics = server_args.enable_metrics + self.enable_metrics_for_all_schedulers = ( + server_args.enable_metrics_for_all_schedulers + ) self.enable_kv_cache_events = server_args.kv_events_config is not None self.stream_interval = server_args.stream_interval self.spec_algorithm = SpeculativeAlgorithm.from_string( @@ -281,9 +284,6 @@ class Scheduler( self.send_to_tokenizer = get_zmq_socket( context, zmq.PUSH, port_args.tokenizer_ipc_name, False ) - self.send_metrics_from_scheduler = get_zmq_socket( - context, zmq.PUSH, port_args.metrics_ipc_name, False - ) if server_args.skip_tokenizer_init: # Directly send to the TokenizerManager @@ -309,10 +309,14 @@ class Scheduler( else: self.recv_from_tokenizer = None self.recv_from_rpc = None - self.send_metrics_from_scheduler = None self.send_to_tokenizer = SimpleNamespace(send_pyobj=lambda x: None) self.send_to_detokenizer = SimpleNamespace(send_pyobj=lambda x: None) + if self.current_scheduler_metrics_enabled(): + self.send_metrics_from_scheduler = get_zmq_socket( + context, zmq.PUSH, port_args.metrics_ipc_name, False + ) + # Init tokenizer self.init_tokenizer() @@ -495,7 +499,7 @@ class Scheduler( self.init_profier() # Init metrics stats - self.init_metrics() + self.init_metrics(tp_rank, pp_rank, dp_rank) self.init_kv_events(server_args.kv_events_config) # Init request dispatcher @@ -537,6 +541,9 @@ class Scheduler( if get_bool_env_var("SGLANG_GC_LOG"): configure_gc_logger() + def current_scheduler_metrics_enabled(self): + return self.attn_tp_rank == 0 or self.enable_metrics_for_all_schedulers + def maybe_sleep_on_idle(self): if self.idle_sleeper is not None: self.idle_sleeper.maybe_sleep() @@ -660,7 +667,7 @@ class Scheduler( self.profile_in_progress: bool = False self.rpd_profiler = None - def init_metrics(self): + def init_metrics(self, tp_rank: int, pp_rank: int, dp_rank: Optional[int]): self.last_gen_throughput: float = 0.0 self.last_input_throughput: float = 0.0 self.step_time_dict = defaultdict(list) # Dict[batch size -> step time] @@ -671,12 +678,15 @@ class Scheduler( self.stats = SchedulerStats() if self.enable_metrics: engine_type = "unified" - self.metrics_collector = SchedulerMetricsCollector( - labels={ - "model_name": self.server_args.served_model_name, - "engine_type": engine_type, - }, - ) + labels = { + "model_name": self.server_args.served_model_name, + "engine_type": engine_type, + "tp_rank": tp_rank, + "pp_rank": pp_rank, + } + if dp_rank is not None: + labels["dp_rank"] = dp_rank + self.metrics_collector = SchedulerMetricsCollector(labels=labels) def init_kv_events(self, kv_events_config: Optional[str]): if self.enable_kv_cache_events: @@ -1519,7 +1529,7 @@ class Scheduler( if ( self.enable_metrics - and self.attn_tp_rank == 0 + and self.current_scheduler_metrics_enabled() and time.perf_counter() > self.metrics_collector.last_log_time + 30 ): # During idle time, also collect metrics every 30 seconds. @@ -1755,7 +1765,7 @@ class Scheduler( self.chunked_req.is_chunked += 1 # Print stats - if self.attn_tp_rank == 0: + if self.current_scheduler_metrics_enabled(): self.log_prefill_stats(adder, can_run_list, running_bs) # Create a new batch diff --git a/python/sglang/srt/managers/scheduler_output_processor_mixin.py b/python/sglang/srt/managers/scheduler_output_processor_mixin.py index 75bc4427a..635121920 100644 --- a/python/sglang/srt/managers/scheduler_output_processor_mixin.py +++ b/python/sglang/srt/managers/scheduler_output_processor_mixin.py @@ -290,7 +290,7 @@ class SchedulerOutputProcessorMixin: self.forward_ct_decode = (self.forward_ct_decode + 1) % (1 << 30) if ( - self.attn_tp_rank == 0 + self.current_scheduler_metrics_enabled() and self.forward_ct_decode % self.server_args.decode_log_interval == 0 ): self.log_decode_stats(can_run_cuda_graph, running_batch=batch) diff --git a/python/sglang/srt/server_args.py b/python/sglang/srt/server_args.py index 95ba9bee6..e475039d7 100644 --- a/python/sglang/srt/server_args.py +++ b/python/sglang/srt/server_args.py @@ -105,6 +105,7 @@ class ServerArgs: crash_dump_folder: Optional[str] = None show_time_cost: bool = False enable_metrics: bool = False + enable_metrics_for_all_schedulers: bool = False bucket_time_to_first_token: Optional[List[float]] = None bucket_e2e_request_latency: Optional[List[float]] = None bucket_inter_token_latency: Optional[List[float]] = None @@ -1002,6 +1003,13 @@ class ServerArgs: action="store_true", help="Enable log prometheus metrics.", ) + parser.add_argument( + "--enable-metrics-for-all-schedulers", + action="store_true", + help="Enable --enable-metrics-for-all-schedulers when you want schedulers on all TP ranks (not just TP 0) " + "to record request metrics separately. This is especially useful when dp_attention is enabled, as " + "otherwise all metrics appear to come from TP 0.", + ) parser.add_argument( "--bucket-time-to-first-token", type=float,