From 24247b416875bfedf367c06a8fae6843abe4d11f Mon Sep 17 00:00:00 2001 From: Simo Lin Date: Mon, 18 Aug 2025 09:25:51 -0700 Subject: [PATCH] [router] add tokenizer metrics (#9307) Co-authored-by: Chang Su --- sgl-router/src/metrics.rs | 246 ++++++++++++++++++++++++ sgl-router/src/tokenizer/factory.rs | 28 ++- sgl-router/src/tokenizer/huggingface.rs | 44 ++++- sgl-router/src/tokenizer/stop.rs | 23 +++ sgl-router/src/tokenizer/stream.rs | 14 ++ 5 files changed, 344 insertions(+), 11 deletions(-) diff --git a/sgl-router/src/metrics.rs b/sgl-router/src/metrics.rs index f16bb3235..afcccb549 100644 --- a/sgl-router/src/metrics.rs +++ b/sgl-router/src/metrics.rs @@ -148,6 +148,94 @@ pub fn init_metrics() { "sgl_router_running_requests", "Number of running requests per worker" ); + + // Tokenizer metrics + describe_histogram!( + "sgl_tokenizer_encode_duration_seconds", + "Time to encode text to tokens" + ); + describe_histogram!( + "sgl_tokenizer_decode_duration_seconds", + "Time to decode tokens to text" + ); + describe_histogram!( + "sgl_tokenizer_encode_batch_duration_seconds", + "Time to encode a batch of texts" + ); + describe_counter!( + "sgl_tokenizer_encode_requests_total", + "Total number of encode requests by tokenizer type" + ); + describe_counter!( + "sgl_tokenizer_decode_requests_total", + "Total number of decode requests by tokenizer type" + ); + describe_counter!( + "sgl_tokenizer_encode_errors_total", + "Total number of encode errors by error type" + ); + describe_counter!( + "sgl_tokenizer_decode_errors_total", + "Total number of decode errors by error type" + ); + describe_histogram!( + "sgl_tokenizer_tokens_per_encode", + "Number of tokens produced per encode operation" + ); + describe_histogram!( + "sgl_tokenizer_chars_per_encode", + "Number of characters in input text per encode" + ); + describe_histogram!( + "sgl_tokenizer_tokens_per_decode", + "Number of tokens decoded per operation" + ); + describe_gauge!( + "sgl_tokenizer_vocab_size", + "Vocabulary size of the loaded tokenizer" + ); + + // Stop sequence detection metrics + describe_counter!( + "sgl_tokenizer_stop_sequences_detected_total", + "Total stop sequences detected by type" + ); + describe_counter!( + "sgl_tokenizer_partial_matches_total", + "Total partial stop sequence matches (jailed text)" + ); + describe_histogram!( + "sgl_tokenizer_stop_detection_duration_seconds", + "Time to check for stop sequences per token" + ); + + // Streaming decode metrics + describe_counter!( + "sgl_tokenizer_stream_tokens_total", + "Total tokens processed in streaming decode" + ); + describe_counter!( + "sgl_tokenizer_stream_incomplete_utf8_total", + "Total incomplete UTF-8 sequences detected" + ); + describe_histogram!( + "sgl_tokenizer_stream_step_duration_seconds", + "Time per streaming decode step" + ); + + // Factory metrics + describe_counter!( + "sgl_tokenizer_factory_loads_total", + "Total tokenizer loads by file type" + ); + describe_counter!( + "sgl_tokenizer_factory_errors_total", + "Total tokenizer loading errors by type" + ); + describe_histogram!( + "sgl_tokenizer_factory_load_duration_seconds", + "Time to load and initialize tokenizer" + ); } pub fn start_prometheus(config: PrometheusConfig) { @@ -177,6 +265,8 @@ pub fn start_prometheus(config: PrometheusConfig) { pub struct RouterMetrics; +pub struct TokenizerMetrics; + impl RouterMetrics { // Request metrics pub fn record_request(route: &str) { @@ -384,6 +474,122 @@ impl RouterMetrics { } } +impl TokenizerMetrics { + // Encoding metrics + pub fn record_encode_request(tokenizer_type: &str) { + counter!("sgl_tokenizer_encode_requests_total", + "tokenizer_type" => tokenizer_type.to_string() + ) + .increment(1); + } + + pub fn record_encode_duration(duration: Duration) { + histogram!("sgl_tokenizer_encode_duration_seconds").record(duration.as_secs_f64()); + } + + pub fn record_encode_error(error_type: &str) { + counter!("sgl_tokenizer_encode_errors_total", + "error_type" => error_type.to_string() + ) + .increment(1); + } + + pub fn record_tokens_per_encode(token_count: usize) { + histogram!("sgl_tokenizer_tokens_per_encode").record(token_count as f64); + } + + pub fn record_chars_per_encode(char_count: usize) { + histogram!("sgl_tokenizer_chars_per_encode").record(char_count as f64); + } + + // Decoding metrics + pub fn record_decode_request(tokenizer_type: &str) { + counter!("sgl_tokenizer_decode_requests_total", + "tokenizer_type" => tokenizer_type.to_string() + ) + .increment(1); + } + + pub fn record_decode_duration(duration: Duration) { + histogram!("sgl_tokenizer_decode_duration_seconds").record(duration.as_secs_f64()); + } + + pub fn record_decode_error(error_type: &str) { + counter!("sgl_tokenizer_decode_errors_total", + "error_type" => error_type.to_string() + ) + .increment(1); + } + + pub fn record_tokens_per_decode(token_count: usize) { + histogram!("sgl_tokenizer_tokens_per_decode").record(token_count as f64); + } + + // Batch encoding metrics + pub fn record_encode_batch_duration(duration: Duration, batch_size: usize) { + histogram!("sgl_tokenizer_encode_batch_duration_seconds", + "batch_size" => batch_size.to_string() + ) + .record(duration.as_secs_f64()); + } + + // Stop sequence detection metrics + pub fn record_stop_sequence_detected(stop_type: &str) { + counter!("sgl_tokenizer_stop_sequences_detected_total", + "type" => stop_type.to_string() + ) + .increment(1); + } + + pub fn record_partial_match() { + counter!("sgl_tokenizer_partial_matches_total").increment(1); + } + + pub fn record_stop_detection_duration(duration: Duration) { + histogram!("sgl_tokenizer_stop_detection_duration_seconds").record(duration.as_secs_f64()); + } + + // Streaming decode metrics + pub fn record_stream_token() { + counter!("sgl_tokenizer_stream_tokens_total").increment(1); + } + + pub fn record_incomplete_utf8() { + counter!("sgl_tokenizer_stream_incomplete_utf8_total").increment(1); + } + + pub fn record_stream_step_duration(duration: Duration) { + histogram!("sgl_tokenizer_stream_step_duration_seconds").record(duration.as_secs_f64()); + } + + // Factory metrics + pub fn record_factory_load(file_type: &str) { + counter!("sgl_tokenizer_factory_loads_total", + "file_type" => file_type.to_string() + ) + .increment(1); + } + + pub fn record_factory_error(error_type: &str) { + counter!("sgl_tokenizer_factory_errors_total", + "error_type" => error_type.to_string() + ) + .increment(1); + } + + pub fn record_factory_load_duration(duration: Duration) { + histogram!("sgl_tokenizer_factory_load_duration_seconds").record(duration.as_secs_f64()); + } + + // Vocabulary metrics + pub fn set_vocab_size(tokenizer_type: &str, size: usize) { + gauge!("sgl_tokenizer_vocab_size", + "tokenizer_type" => tokenizer_type.to_string() + ) + .set(size as f64); + } +} + #[cfg(test)] mod tests { use super::*; @@ -646,6 +852,46 @@ mod tests { RouterMetrics::set_running_requests("http://worker1", 15); } + #[test] + fn test_tokenizer_metrics_static_methods() { + // Test that all tokenizer metric methods can be called without panic + + // Encoding metrics + TokenizerMetrics::record_encode_request("huggingface"); + TokenizerMetrics::record_encode_duration(Duration::from_millis(10)); + TokenizerMetrics::record_encode_error("invalid_input"); + TokenizerMetrics::record_tokens_per_encode(100); + TokenizerMetrics::record_chars_per_encode(500); + + // Decoding metrics + TokenizerMetrics::record_decode_request("huggingface"); + TokenizerMetrics::record_decode_duration(Duration::from_millis(5)); + TokenizerMetrics::record_decode_error("invalid_tokens"); + TokenizerMetrics::record_tokens_per_decode(50); + + // Batch encoding + TokenizerMetrics::record_encode_batch_duration(Duration::from_millis(100), 10); + + // Stop sequence detection + TokenizerMetrics::record_stop_sequence_detected("token"); + TokenizerMetrics::record_stop_sequence_detected("string"); + TokenizerMetrics::record_partial_match(); + TokenizerMetrics::record_stop_detection_duration(Duration::from_micros(100)); + + // Streaming decode + TokenizerMetrics::record_stream_token(); + TokenizerMetrics::record_incomplete_utf8(); + TokenizerMetrics::record_stream_step_duration(Duration::from_micros(50)); + + // Factory metrics + TokenizerMetrics::record_factory_load("json"); + TokenizerMetrics::record_factory_error("unsupported_format"); + TokenizerMetrics::record_factory_load_duration(Duration::from_millis(200)); + + // Vocabulary metrics + TokenizerMetrics::set_vocab_size("huggingface", 50000); + } + // ============= Port Availability Tests ============= #[test] diff --git a/sgl-router/src/tokenizer/factory.rs b/sgl-router/src/tokenizer/factory.rs index 6639f35a1..04b950d3c 100644 --- a/sgl-router/src/tokenizer/factory.rs +++ b/sgl-router/src/tokenizer/factory.rs @@ -1,9 +1,11 @@ -use super::traits; +use super::{traits, TokenizerTrait}; +use crate::metrics::TokenizerMetrics; use anyhow::{Error, Result}; use std::fs::File; use std::io::Read; use std::path::Path; use std::sync::Arc; +use std::time::Instant; #[cfg(feature = "huggingface")] use super::huggingface::HuggingFaceTokenizer; @@ -22,6 +24,8 @@ pub enum TokenizerType { /// - json: HuggingFace tokenizer /// - For testing: can return mock tokenizer pub fn create_tokenizer_from_file(file_path: &str) -> Result> { + let start_time = Instant::now(); + // Special case for testing if file_path == "mock" || file_path == "test" { return Ok(Arc::new(super::mock::MockTokenizer::new())); @@ -31,6 +35,7 @@ pub fn create_tokenizer_from_file(file_path: &str) -> Result Result { #[cfg(feature = "huggingface")] { let tokenizer = HuggingFaceTokenizer::from_file(file_path)?; - Ok(Arc::new(tokenizer)) + + TokenizerMetrics::record_factory_load("json"); + TokenizerMetrics::set_vocab_size("huggingface", tokenizer.vocab_size()); + + Ok(Arc::new(tokenizer) as Arc) } #[cfg(not(feature = "huggingface"))] { + TokenizerMetrics::record_factory_error("huggingface_disabled"); Err(Error::msg( "HuggingFace support not enabled. Enable the 'huggingface' feature.", )) @@ -56,17 +66,27 @@ pub fn create_tokenizer_from_file(file_path: &str) -> Result { // SentencePiece model file + TokenizerMetrics::record_factory_error("unsupported_sentencepiece"); Err(Error::msg("SentencePiece models not yet supported")) } Some("gguf") => { // GGUF format + TokenizerMetrics::record_factory_error("unsupported_gguf"); Err(Error::msg("GGUF format not yet supported")) } _ => { // Try to auto-detect by reading file content - auto_detect_tokenizer(file_path) + auto_detect_tokenizer(file_path).inspect(|tokenizer| { + TokenizerMetrics::record_factory_load("auto_detected"); + TokenizerMetrics::set_vocab_size("auto_detected", tokenizer.vocab_size()); + }) } + }; + + if result.is_ok() { + TokenizerMetrics::record_factory_load_duration(start_time.elapsed()); } + result } /// Auto-detect tokenizer type by examining file content diff --git a/sgl-router/src/tokenizer/huggingface.rs b/sgl-router/src/tokenizer/huggingface.rs index 70eabfc4a..ec07ce6d8 100644 --- a/sgl-router/src/tokenizer/huggingface.rs +++ b/sgl-router/src/tokenizer/huggingface.rs @@ -1,6 +1,8 @@ use super::traits::{Decoder, Encoder, Encoding, SpecialTokens, Tokenizer as TokenizerTrait}; +use crate::metrics::TokenizerMetrics; use anyhow::{Error, Result}; use std::collections::HashMap; +use std::time::Instant; use tokenizers::tokenizer::Tokenizer as HfTokenizer; /// HuggingFace tokenizer wrapper @@ -92,19 +94,36 @@ impl HuggingFaceTokenizer { impl Encoder for HuggingFaceTokenizer { fn encode(&self, input: &str) -> Result { - let encoding = self - .tokenizer - .encode(input, false) - .map_err(|e| Error::msg(format!("Encoding failed: {}", e)))?; + let start = Instant::now(); - Ok(Encoding::Hf(Box::new(encoding))) + TokenizerMetrics::record_encode_request("huggingface"); + TokenizerMetrics::record_chars_per_encode(input.len()); + + self.tokenizer + .encode(input, false) + .map_err(|e| { + TokenizerMetrics::record_encode_error("encoding_failed"); + Error::msg(format!("Encoding failed: {}", e)) + }) + .map(|encoding| { + TokenizerMetrics::record_tokens_per_encode(encoding.get_ids().len()); + TokenizerMetrics::record_encode_duration(start.elapsed()); + Encoding::Hf(Box::new(encoding)) + }) } fn encode_batch(&self, inputs: &[&str]) -> Result> { + let start = Instant::now(); + let encodings = self .tokenizer .encode_batch(inputs.to_vec(), false) - .map_err(|e| Error::msg(format!("Batch encoding failed: {}", e)))?; + .map_err(|e| { + TokenizerMetrics::record_encode_error("batch_encoding_failed"); + Error::msg(format!("Batch encoding failed: {}", e)) + })?; + + TokenizerMetrics::record_encode_batch_duration(start.elapsed(), inputs.len()); Ok(encodings .into_iter() @@ -115,9 +134,20 @@ impl Encoder for HuggingFaceTokenizer { impl Decoder for HuggingFaceTokenizer { fn decode(&self, token_ids: &[u32], skip_special_tokens: bool) -> Result { + let start = Instant::now(); + + TokenizerMetrics::record_decode_request("huggingface"); + TokenizerMetrics::record_tokens_per_decode(token_ids.len()); + self.tokenizer .decode(token_ids, skip_special_tokens) - .map_err(|e| Error::msg(format!("Decoding failed: {}", e))) + .map_err(|e| { + TokenizerMetrics::record_decode_error("decoding_failed"); + Error::msg(format!("Decoding failed: {}", e)) + }) + .inspect(|_| { + TokenizerMetrics::record_decode_duration(start.elapsed()); + }) } } diff --git a/sgl-router/src/tokenizer/stop.rs b/sgl-router/src/tokenizer/stop.rs index 19dd60802..96a6d4c9e 100644 --- a/sgl-router/src/tokenizer/stop.rs +++ b/sgl-router/src/tokenizer/stop.rs @@ -1,7 +1,9 @@ use super::traits; +use crate::metrics::TokenizerMetrics; use anyhow::Result; use std::collections::HashSet; use std::sync::Arc; +use std::time::Instant; /// Output from the sequence decoder #[derive(Debug, Clone, PartialEq)] @@ -93,6 +95,8 @@ impl StopSequenceDecoder { /// Process a single token pub fn process_token(&mut self, token_id: u32) -> Result { + let start = Instant::now(); + if self.stopped { return Ok(SequenceDecoderOutput::Stopped); } @@ -100,23 +104,30 @@ impl StopSequenceDecoder { // Check for token-level stops first if self.config.stop_tokens.contains(&token_id) { self.stopped = true; + TokenizerMetrics::record_stop_sequence_detected("token"); + // Flush any jailed text before stopping if !self.jail_buffer.is_empty() { let output = self.jail_buffer.clone(); self.jail_buffer.clear(); + TokenizerMetrics::record_stop_detection_duration(start.elapsed()); return Ok(SequenceDecoderOutput::StoppedWithText(output)); } + TokenizerMetrics::record_stop_detection_duration(start.elapsed()); return Ok(SequenceDecoderOutput::Stopped); } if self.config.visible_stop_tokens.contains(&token_id) { self.stopped = true; + TokenizerMetrics::record_stop_sequence_detected("visible_token"); + // Include jailed text plus the stop token let stop_text = self .tokenizer .decode(&[token_id], self.skip_special_tokens)?; let output = format!("{}{}", self.jail_buffer, stop_text); self.jail_buffer.clear(); + TokenizerMetrics::record_stop_detection_duration(start.elapsed()); return Ok(SequenceDecoderOutput::StoppedWithText(output)); } @@ -161,9 +172,12 @@ impl StopSequenceDecoder { for stop_seq in &self.config.stop_sequences { if let Some(pos) = check_text.find(stop_seq) { self.stopped = true; + TokenizerMetrics::record_stop_sequence_detected("string"); + // Output text before the stop sequence let output = check_text[..pos].to_string(); self.jail_buffer.clear(); + TokenizerMetrics::record_stop_detection_duration(start.elapsed()); return Ok(if output.is_empty() { SequenceDecoderOutput::Stopped } else { @@ -176,10 +190,13 @@ impl StopSequenceDecoder { for stop_seq in &self.config.visible_stop_sequences { if let Some(pos) = check_text.find(stop_seq) { self.stopped = true; + TokenizerMetrics::record_stop_sequence_detected("visible_string"); + // Include the stop sequence in output let end_pos = pos + stop_seq.len(); let output = check_text[..end_pos].to_string(); self.jail_buffer.clear(); + TokenizerMetrics::record_stop_detection_duration(start.elapsed()); return Ok(SequenceDecoderOutput::StoppedWithText(output)); } } @@ -202,6 +219,8 @@ impl StopSequenceDecoder { } if partial_match_len > 0 { + TokenizerMetrics::record_partial_match(); + // Split: output safe text, jail the potential match let safe_end = check_text.len() - partial_match_len; let safe_text = &check_text[..safe_end]; @@ -211,6 +230,8 @@ impl StopSequenceDecoder { self.prefix_offset = self.read_offset; self.read_offset = self.token_buffer.len(); + TokenizerMetrics::record_stop_detection_duration(start.elapsed()); + if safe_text.is_empty() { Ok(SequenceDecoderOutput::Held) } else { @@ -224,6 +245,8 @@ impl StopSequenceDecoder { self.prefix_offset = self.read_offset; self.read_offset = self.token_buffer.len(); + TokenizerMetrics::record_stop_detection_duration(start.elapsed()); + Ok(SequenceDecoderOutput::Text(check_text)) } } diff --git a/sgl-router/src/tokenizer/stream.rs b/sgl-router/src/tokenizer/stream.rs index 6b236b03f..8ff3abe28 100644 --- a/sgl-router/src/tokenizer/stream.rs +++ b/sgl-router/src/tokenizer/stream.rs @@ -1,8 +1,10 @@ // src/tokenizer/stream.rs use super::traits; +use crate::metrics::TokenizerMetrics; use anyhow::Result; use std::sync::Arc; +use std::time::Instant; const INITIAL_INCREMENTAL_DETOKENIZATION_OFFSET: usize = 5; @@ -43,8 +45,12 @@ impl DecodeStream { /// Step appends a token_id to the internal state and tries to produce a text chunk. /// Returning `None` means the given id is not enough to produce a chunk. pub fn step(&mut self, id: u32) -> Result> { + let start = Instant::now(); + self.all_token_ids.push(id); + TokenizerMetrics::record_stream_token(); + let prefix_text = self.tokenizer.decode( &self.all_token_ids[self.prefix_offset..self.read_offset], self.skip_special_tokens, @@ -61,8 +67,16 @@ impl DecodeStream { self.prefix_offset = self.read_offset; self.read_offset = self.all_token_ids.len(); + TokenizerMetrics::record_stream_step_duration(start.elapsed()); + Ok(Some(new_text)) } else { + if new_text.ends_with("�") { + TokenizerMetrics::record_incomplete_utf8(); + } + + TokenizerMetrics::record_stream_step_duration(start.elapsed()); + Ok(None) } }