[metrics] Add in queue metrics (#4444)
This commit is contained in:
@@ -474,6 +474,10 @@ class Req:
|
||||
self.temp_scaled_logprobs = False
|
||||
self.top_p_normalized_logprobs = False
|
||||
|
||||
# Latency Breakdown
|
||||
self.queue_time_start = None
|
||||
self.queue_time_end = None
|
||||
|
||||
# Logprobs (return values)
|
||||
self.input_token_logprobs_val: Optional[List[float]] = None
|
||||
self.input_token_logprobs_idx: Optional[List[int]] = None
|
||||
|
||||
@@ -838,6 +838,7 @@ class Scheduler(
|
||||
eos_token_ids=self.model_config.hf_eos_token_id,
|
||||
)
|
||||
req.tokenizer = self.tokenizer
|
||||
req.queue_time_start = time.time()
|
||||
|
||||
if (
|
||||
recv_req.session_params is not None
|
||||
@@ -852,6 +853,7 @@ class Scheduler(
|
||||
# Create a new request from a previous session
|
||||
session = self.sessions[recv_req.session_params.id]
|
||||
req = session.create_req(recv_req, self.tokenizer)
|
||||
req.queue_time_start = time.time()
|
||||
if isinstance(req.finished_reason, FINISH_ABORT):
|
||||
self._add_request_to_queue(req)
|
||||
return
|
||||
@@ -995,6 +997,7 @@ class Scheduler(
|
||||
req.finished_reason = FINISH_ABORT(
|
||||
error_msg, HTTPStatus.BAD_REQUEST, "BadRequestError"
|
||||
)
|
||||
req.queue_time_start = time.time()
|
||||
self.waiting_queue.append(req)
|
||||
return
|
||||
|
||||
@@ -1031,9 +1034,10 @@ class Scheduler(
|
||||
self._largest_prefill_len, adder.log_input_tokens
|
||||
)
|
||||
|
||||
num_new_seq = len(can_run_list)
|
||||
f = (
|
||||
f"Prefill batch. "
|
||||
f"#new-seq: {len(can_run_list)}, "
|
||||
f"#new-seq: {num_new_seq}, "
|
||||
f"#new-token: {adder.log_input_tokens}, "
|
||||
f"#cached-token: {adder.log_hit_tokens}, "
|
||||
f"token usage: {num_used / self.max_total_num_tokens:.2f}, "
|
||||
@@ -1051,6 +1055,12 @@ class Scheduler(
|
||||
self.stats.token_usage = round(num_used / self.max_total_num_tokens, 2)
|
||||
self.stats.num_queue_reqs = len(self.waiting_queue)
|
||||
self.stats.cache_hit_rate = cache_hit_rate
|
||||
|
||||
total_queue_latency = 0
|
||||
for req in can_run_list:
|
||||
total_queue_latency += req.queue_time_end - req.queue_time_start
|
||||
self.stats.avg_request_queue_latency = total_queue_latency / num_new_seq
|
||||
|
||||
self.metrics_collector.log_stats(self.stats)
|
||||
|
||||
def log_decode_stats(self):
|
||||
@@ -1287,6 +1297,12 @@ class Scheduler(
|
||||
can_run_list: List[Req] = adder.can_run_list
|
||||
if len(can_run_list) == 0:
|
||||
return None
|
||||
|
||||
if self.enable_metrics:
|
||||
# only record queue time when enable_metrics is True to avoid overhead
|
||||
for req in can_run_list:
|
||||
req.queue_time_end = time.time()
|
||||
|
||||
self.waiting_queue = [
|
||||
x for x in self.waiting_queue if x not in set(can_run_list)
|
||||
]
|
||||
|
||||
@@ -27,6 +27,7 @@ class SchedulerStats:
|
||||
num_queue_reqs: int = 0
|
||||
cache_hit_rate: float = 0.0
|
||||
spec_accept_length: float = 0.0
|
||||
avg_request_queue_latency: float = 0.0
|
||||
|
||||
|
||||
class SchedulerMetricsCollector:
|
||||
@@ -87,6 +88,13 @@ class SchedulerMetricsCollector:
|
||||
multiprocess_mode="mostrecent",
|
||||
)
|
||||
|
||||
self.avg_request_queue_latency = Gauge(
|
||||
name="sglang:avg_request_queue_latency",
|
||||
documentation="The average request queue latency for the last batch of requests in seconds.",
|
||||
labelnames=labels.keys(),
|
||||
multiprocess_mode="mostrecent",
|
||||
)
|
||||
|
||||
def _log_gauge(self, gauge, data: Union[int, float]) -> None:
|
||||
# Convenience function for logging to gauge.
|
||||
gauge.labels(**self.labels).set(data)
|
||||
@@ -99,6 +107,7 @@ class SchedulerMetricsCollector:
|
||||
self._log_gauge(self.num_queue_reqs, stats.num_queue_reqs)
|
||||
self._log_gauge(self.cache_hit_rate, stats.cache_hit_rate)
|
||||
self._log_gauge(self.spec_accept_length, stats.spec_accept_length)
|
||||
self._log_gauge(self.avg_request_queue_latency, stats.avg_request_queue_latency)
|
||||
self.last_log_time = time.time()
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user