From 5d4fe1ceeef208b8150fb57896bb9782c3901eed Mon Sep 17 00:00:00 2001 From: Simo Lin Date: Fri, 26 Sep 2025 06:57:57 -0400 Subject: [PATCH] [router] add move grpc worker management from router to worker manager (#10960) --- sgl-router/src/core/worker_manager.rs | 183 +++++++++++++++++++++-- sgl-router/src/routers/factory.rs | 128 ++++++---------- sgl-router/src/routers/grpc/pd_router.rs | 121 ++------------- sgl-router/src/routers/grpc/router.rs | 88 +---------- 4 files changed, 232 insertions(+), 288 deletions(-) diff --git a/sgl-router/src/core/worker_manager.rs b/sgl-router/src/core/worker_manager.rs index 0fc8e8b0e..323d5cdd9 100644 --- a/sgl-router/src/core/worker_manager.rs +++ b/sgl-router/src/core/worker_manager.rs @@ -181,29 +181,55 @@ impl WorkerManager { ) -> Result<(), String> { info!("Starting worker initialization"); + // Determine connection mode from config + let connection_mode = &config.connection_mode; + match &config.mode { - RoutingMode::Regular { worker_urls } => { - Self::initialize_regular_workers(worker_urls, config, registry, policy_registry) + RoutingMode::Regular { worker_urls } => match connection_mode { + ConfigConnectionMode::Http => { + Self::initialize_regular_workers( + worker_urls, + config, + registry, + policy_registry, + ) .await?; - } + } + ConfigConnectionMode::Grpc => { + Self::initialize_grpc_workers(worker_urls, config, registry, policy_registry) + .await?; + } + }, RoutingMode::PrefillDecode { prefill_urls, decode_urls, .. - } => { - let prefill_entries: Vec<(&String, &Option)> = - prefill_urls.iter().map(|(url, port)| (url, port)).collect(); + } => match connection_mode { + ConfigConnectionMode::Http => { + let prefill_entries: Vec<(&String, &Option)> = + prefill_urls.iter().map(|(url, port)| (url, port)).collect(); - Self::initialize_prefill_workers( - &prefill_entries, - config, - registry, - policy_registry, - ) - .await?; - Self::initialize_decode_workers(decode_urls, config, registry, policy_registry) + Self::initialize_prefill_workers( + &prefill_entries, + config, + registry, + policy_registry, + ) .await?; - } + Self::initialize_decode_workers(decode_urls, config, registry, policy_registry) + .await?; + } + ConfigConnectionMode::Grpc => { + Self::initialize_grpc_pd_workers( + prefill_urls, + decode_urls, + config, + registry, + policy_registry, + ) + .await?; + } + }, RoutingMode::OpenAI { .. } => { info!("OpenAI routing mode - no workers to initialize"); } @@ -397,6 +423,133 @@ impl WorkerManager { Ok(()) } + /// Initialize gRPC workers for regular mode + async fn initialize_grpc_workers( + urls: &[String], + config: &RouterConfig, + registry: &Arc, + policy_registry: Option<&Arc>, + ) -> Result<(), String> { + info!("Creating {} gRPC regular workers", urls.len()); + + let circuit_breaker_config = + Self::convert_circuit_breaker_config(&config.effective_circuit_breaker_config()); + let health_config = Self::convert_health_config(&config.health_check); + let connection_mode = ConnectionMode::Grpc { port: None }; + + let mut registered_workers: HashMap>> = HashMap::new(); + + for url in urls { + let worker = Self::create_basic_worker( + url.clone(), + WorkerType::Regular, + connection_mode.clone(), + config.api_key.clone(), + None, + circuit_breaker_config.clone(), + health_config.clone(), + ); + Self::register_worker(worker, registry, &mut registered_workers, policy_registry); + info!( + "Registered gRPC worker at {} (will connect on first use)", + url + ); + } + + Self::initialize_cache_policies(®istered_workers, registry, policy_registry); + Ok(()) + } + + /// Initialize gRPC PD (Prefill-Decode) workers + async fn initialize_grpc_pd_workers( + prefill_urls: &[(String, Option)], + decode_urls: &[String], + config: &RouterConfig, + registry: &Arc, + policy_registry: Option<&Arc>, + ) -> Result<(), String> { + info!( + "Creating {} gRPC prefill workers and {} gRPC decode workers", + prefill_urls.len(), + decode_urls.len() + ); + + let circuit_breaker_config = + Self::convert_circuit_breaker_config(&config.effective_circuit_breaker_config()); + let health_config = Self::convert_health_config(&config.health_check); + + let mut registered_prefill_workers: HashMap>> = HashMap::new(); + let mut registered_decode_workers: HashMap>> = HashMap::new(); + + for (url, bootstrap_port) in prefill_urls { + let worker_type = WorkerType::Prefill { + bootstrap_port: *bootstrap_port, + }; + let connection_mode = ConnectionMode::Grpc { + port: *bootstrap_port, + }; + + let worker = Self::create_basic_worker( + url.clone(), + worker_type, + connection_mode, + config.api_key.clone(), + None, + circuit_breaker_config.clone(), + health_config.clone(), + ); + Self::register_worker( + worker, + registry, + &mut registered_prefill_workers, + policy_registry, + ); + info!( + "Registered gRPC prefill worker at {} (will connect on first use)", + url + ); + } + + // Create decode workers + for url in decode_urls { + let connection_mode = ConnectionMode::Grpc { port: None }; + + let worker = Self::create_basic_worker( + url.clone(), + WorkerType::Decode, + connection_mode, + config.api_key.clone(), + None, + circuit_breaker_config.clone(), + health_config.clone(), + ); + Self::register_worker( + worker, + registry, + &mut registered_decode_workers, + policy_registry, + ); + info!( + "Registered gRPC decode worker at {} (will connect on first use)", + url + ); + } + + if let Some(policy_reg) = policy_registry { + let all_prefill_workers: Vec> = registered_prefill_workers + .values() + .flat_map(|workers| workers.iter().cloned()) + .collect(); + let all_decode_workers: Vec> = registered_decode_workers + .values() + .flat_map(|workers| workers.iter().cloned()) + .collect(); + policy_reg.init_pd_cache_aware_policies(&all_prefill_workers, &all_decode_workers); + } + + Ok(()) + } + /// Add a worker from a configuration request pub async fn add_worker_from_config( config: &WorkerConfigRequest, diff --git a/sgl-router/src/routers/factory.rs b/sgl-router/src/routers/factory.rs index a5452a6cf..ef9e21c0a 100644 --- a/sgl-router/src/routers/factory.rs +++ b/sgl-router/src/routers/factory.rs @@ -1,5 +1,7 @@ //! Factory for creating router instances +use super::grpc::pd_router::GrpcPDRouter; +use super::grpc::router::GrpcRouter; use super::{ http::{openai_router::OpenAIRouter, pd_router::PDRouter, router::Router}, RouterTrait, @@ -15,61 +17,45 @@ pub struct RouterFactory; impl RouterFactory { /// Create a router instance from application context pub async fn create_router(ctx: &Arc) -> Result, String> { - // Check connection mode and route to appropriate implementation match ctx.router_config.connection_mode { - ConnectionMode::Grpc => { - // Route to gRPC implementation based on routing mode - match &ctx.router_config.mode { - RoutingMode::Regular { worker_urls } => { - Self::create_grpc_router(worker_urls, &ctx.router_config.policy, ctx).await - } - RoutingMode::PrefillDecode { - prefill_urls, - decode_urls, - prefill_policy, - decode_policy, - } => { - Self::create_grpc_pd_router( - prefill_urls, - decode_urls, - prefill_policy.as_ref(), - decode_policy.as_ref(), - &ctx.router_config.policy, - ctx, - ) - .await - } - RoutingMode::OpenAI { .. } => { - Err("OpenAI mode requires HTTP connection_mode".to_string()) - } + ConnectionMode::Grpc => match &ctx.router_config.mode { + RoutingMode::Regular { .. } => Self::create_grpc_router(ctx).await, + RoutingMode::PrefillDecode { + prefill_policy, + decode_policy, + .. + } => { + Self::create_grpc_pd_router( + prefill_policy.as_ref(), + decode_policy.as_ref(), + &ctx.router_config.policy, + ctx, + ) + .await } - } - ConnectionMode::Http => { - // Route to HTTP implementation based on routing mode - match &ctx.router_config.mode { - RoutingMode::Regular { .. } => { - // Workers already initialized in registry - Self::create_regular_router(ctx).await - } - RoutingMode::PrefillDecode { - prefill_policy, - decode_policy, - .. - } => { - // Workers already initialized in registry - Self::create_pd_router( - prefill_policy.as_ref(), - decode_policy.as_ref(), - &ctx.router_config.policy, - ctx, - ) - .await - } - RoutingMode::OpenAI { worker_urls, .. } => { - Self::create_openai_router(worker_urls.clone(), ctx).await - } + RoutingMode::OpenAI { .. } => { + Err("OpenAI mode requires HTTP connection_mode".to_string()) } - } + }, + ConnectionMode::Http => match &ctx.router_config.mode { + RoutingMode::Regular { .. } => Self::create_regular_router(ctx).await, + RoutingMode::PrefillDecode { + prefill_policy, + decode_policy, + .. + } => { + Self::create_pd_router( + prefill_policy.as_ref(), + decode_policy.as_ref(), + &ctx.router_config.policy, + ctx, + ) + .await + } + RoutingMode::OpenAI { worker_urls, .. } => { + Self::create_openai_router(worker_urls.clone(), ctx).await + } + }, } } @@ -77,8 +63,6 @@ impl RouterFactory { pub async fn create_regular_router( ctx: &Arc, ) -> Result, String> { - // Create regular router with context - // Workers should already be initialized in the registry let router = Router::new(ctx).await?; Ok(Box::new(router)) @@ -91,66 +75,41 @@ impl RouterFactory { main_policy_config: &PolicyConfig, ctx: &Arc, ) -> Result, String> { - // Initialize policies in PolicyRegistry - use specific policies if provided, otherwise fall back to main policy let prefill_policy = PolicyFactory::create_from_config(prefill_policy_config.unwrap_or(main_policy_config)); let decode_policy = PolicyFactory::create_from_config(decode_policy_config.unwrap_or(main_policy_config)); - // Set the prefill and decode policies in the registry ctx.policy_registry.set_prefill_policy(prefill_policy); ctx.policy_registry.set_decode_policy(decode_policy); - // Create PD router with context (policies are in PolicyRegistry) - // Workers should already be initialized in the registry let router = PDRouter::new(ctx).await?; Ok(Box::new(router)) } /// Create a gRPC router with injected policy - pub async fn create_grpc_router( - worker_urls: &[String], - policy_config: &PolicyConfig, - ctx: &Arc, - ) -> Result, String> { - use super::grpc::router::GrpcRouter; - - // Create policy - let policy = PolicyFactory::create_from_config(policy_config); - - // Create gRPC router with context - let router = GrpcRouter::new(worker_urls.to_vec(), policy, ctx).await?; + pub async fn create_grpc_router(ctx: &Arc) -> Result, String> { + let router = GrpcRouter::new(ctx).await?; Ok(Box::new(router)) } /// Create a gRPC PD router with tokenizer and worker configuration pub async fn create_grpc_pd_router( - prefill_urls: &[(String, Option)], - decode_urls: &[String], prefill_policy_config: Option<&PolicyConfig>, decode_policy_config: Option<&PolicyConfig>, main_policy_config: &PolicyConfig, ctx: &Arc, ) -> Result, String> { - use super::grpc::pd_router::GrpcPDRouter; - - // Create policies - use specific policies if provided, otherwise fall back to main policy let prefill_policy = PolicyFactory::create_from_config(prefill_policy_config.unwrap_or(main_policy_config)); let decode_policy = PolicyFactory::create_from_config(decode_policy_config.unwrap_or(main_policy_config)); - // Create gRPC PD router with context - let router = GrpcPDRouter::new( - prefill_urls.to_vec(), - decode_urls.to_vec(), - prefill_policy, - decode_policy, - ctx, - ) - .await?; + ctx.policy_registry.set_prefill_policy(prefill_policy); + ctx.policy_registry.set_decode_policy(decode_policy); + let router = GrpcPDRouter::new(ctx).await?; Ok(Box::new(router)) } @@ -160,7 +119,6 @@ impl RouterFactory { worker_urls: Vec, ctx: &Arc, ) -> Result, String> { - // Use the first worker URL as the OpenAI-compatible base let base_url = worker_urls .first() .cloned() diff --git a/sgl-router/src/routers/grpc/pd_router.rs b/sgl-router/src/routers/grpc/pd_router.rs index 6c259456a..760b69c31 100644 --- a/sgl-router/src/routers/grpc/pd_router.rs +++ b/sgl-router/src/routers/grpc/pd_router.rs @@ -1,11 +1,9 @@ // PD (Prefill-Decode) gRPC Router Implementation use crate::config::types::RetryConfig; -use crate::core::{ - BasicWorkerBuilder, CircuitBreakerConfig, HealthConfig, WorkerRegistry, WorkerType, -}; +use crate::core::{WorkerRegistry, WorkerType}; use crate::metrics::RouterMetrics; -use crate::policies::{LoadBalancingPolicy, PolicyRegistry}; +use crate::policies::PolicyRegistry; use crate::reasoning_parser::ParserFactory; use crate::routers::RouterTrait; use crate::tokenizer::traits::Tokenizer; @@ -18,51 +16,29 @@ use axum::{ response::{IntoResponse, Response}, }; use std::sync::Arc; -use std::time::Duration; use tracing::info; /// gRPC PD (Prefill-Decode) router implementation for SGLang #[allow(dead_code)] // Fields will be used once implementation is complete pub struct GrpcPDRouter { - /// Centralized worker registry worker_registry: Arc, - /// Centralized policy registry policy_registry: Arc, - /// Load balancing policy for prefill - prefill_policy: Arc, - /// Load balancing policy for decode - decode_policy: Arc, - /// Tokenizer for handling text encoding/decoding tokenizer: Arc, - /// Reasoning parser factory for structured reasoning outputs reasoning_parser_factory: ParserFactory, - /// Tool parser registry for function/tool calls tool_parser_registry: &'static ParserRegistry, - /// Configuration - timeout_secs: u64, - interval_secs: u64, + dp_aware: bool, api_key: Option, retry_config: RetryConfig, - circuit_breaker_config: CircuitBreakerConfig, } impl GrpcPDRouter { /// Create a new gRPC PD router - pub async fn new( - prefill_urls: Vec<(String, Option)>, - decode_urls: Vec, - prefill_policy: Arc, - decode_policy: Arc, - ctx: &Arc, - ) -> Result { + pub async fn new(ctx: &Arc) -> Result { // Get registries from context let worker_registry = ctx.worker_registry.clone(); let policy_registry = ctx.policy_registry.clone(); - // Update metrics - RouterMetrics::set_active_workers(prefill_urls.len() + decode_urls.len()); - // Extract necessary components from context let tokenizer = ctx .tokenizer @@ -78,67 +54,7 @@ impl GrpcPDRouter { .tool_parser_registry .ok_or_else(|| "gRPC PD router requires tool parser registry".to_string())?; - // Convert config CircuitBreakerConfig to core CircuitBreakerConfig - let circuit_breaker_config = ctx.router_config.effective_circuit_breaker_config(); - let core_cb_config = CircuitBreakerConfig { - failure_threshold: circuit_breaker_config.failure_threshold, - success_threshold: circuit_breaker_config.success_threshold, - timeout_duration: Duration::from_secs(circuit_breaker_config.timeout_duration_secs), - window_duration: Duration::from_secs(circuit_breaker_config.window_duration_secs), - }; - - for (url, bootstrap_port) in &prefill_urls { - let worker = BasicWorkerBuilder::new(url.clone()) - .worker_type(WorkerType::Prefill { - bootstrap_port: *bootstrap_port, - }) - .connection_mode(crate::core::ConnectionMode::Grpc { - port: *bootstrap_port, - }) - .circuit_breaker_config(core_cb_config.clone()) - .health_config(HealthConfig { - timeout_secs: ctx.router_config.health_check.timeout_secs, - check_interval_secs: ctx.router_config.health_check.check_interval_secs, - endpoint: ctx.router_config.health_check.endpoint.clone(), - failure_threshold: ctx.router_config.health_check.failure_threshold, - success_threshold: ctx.router_config.health_check.success_threshold, - }) - // No longer passing pre-initialized client - will be created lazily - .build(); - - worker_registry.register(Arc::new(worker)); - info!( - "Registered gRPC prefill worker at {} (will connect on first use)", - url - ); - } - - for url in &decode_urls { - let worker = BasicWorkerBuilder::new(url.clone()) - .worker_type(WorkerType::Decode) - .connection_mode(crate::core::ConnectionMode::Grpc { port: None }) - .circuit_breaker_config(core_cb_config.clone()) - .health_config(HealthConfig { - timeout_secs: ctx.router_config.health_check.timeout_secs, - check_interval_secs: ctx.router_config.health_check.check_interval_secs, - endpoint: ctx.router_config.health_check.endpoint.clone(), - failure_threshold: ctx.router_config.health_check.failure_threshold, - success_threshold: ctx.router_config.health_check.success_threshold, - }) - .build(); - - worker_registry.register(Arc::new(worker)); - info!( - "Registered gRPC decode worker at {} (will connect on first use)", - url - ); - } - - if prefill_urls.is_empty() && decode_urls.is_empty() { - return Err("No gRPC workers configured".to_string()); - } - - // Initialize policies with workers if needed - filter for gRPC workers only + // Get prefill and decode workers from registry - they should have been created by WorkerManager let prefill_workers = worker_registry.get_workers_filtered( None, // any model Some(WorkerType::Prefill { @@ -147,12 +63,6 @@ impl GrpcPDRouter { Some(crate::core::ConnectionMode::Grpc { port: None }), false, // include unhealthy workers during initialization ); - if let Some(cache_aware) = prefill_policy - .as_any() - .downcast_ref::() - { - cache_aware.init_workers(&prefill_workers); - } let decode_workers = worker_registry.get_workers_filtered( None, // any model @@ -160,29 +70,26 @@ impl GrpcPDRouter { Some(crate::core::ConnectionMode::Grpc { port: None }), false, // include unhealthy workers during initialization ); - if let Some(cache_aware) = decode_policy - .as_any() - .downcast_ref::() - { - cache_aware.init_workers(&decode_workers); - } + + // Update metrics + RouterMetrics::set_active_workers(prefill_workers.len() + decode_workers.len()); + info!( + "gRPC PD router found {} prefill and {} decode workers in registry", + prefill_workers.len(), + decode_workers.len() + ); // No need for local health checkers - WorkerRegistry handles health checking Ok(GrpcPDRouter { worker_registry, policy_registry, - prefill_policy, - decode_policy, tokenizer, reasoning_parser_factory, tool_parser_registry, - timeout_secs: ctx.router_config.worker_startup_timeout_secs, - interval_secs: ctx.router_config.worker_startup_check_interval_secs, dp_aware: ctx.router_config.dp_aware, api_key: ctx.router_config.api_key.clone(), retry_config: ctx.router_config.effective_retry_config(), - circuit_breaker_config: core_cb_config, }) } } @@ -206,8 +113,6 @@ impl std::fmt::Debug for GrpcPDRouter { f.debug_struct("GrpcPDRouter") .field("prefill_workers_count", &prefill_workers.len()) .field("decode_workers_count", &decode_workers.len()) - .field("timeout_secs", &self.timeout_secs) - .field("interval_secs", &self.interval_secs) .field("dp_aware", &self.dp_aware) .finish() } diff --git a/sgl-router/src/routers/grpc/router.rs b/sgl-router/src/routers/grpc/router.rs index de8e04f86..58eb7f52b 100644 --- a/sgl-router/src/routers/grpc/router.rs +++ b/sgl-router/src/routers/grpc/router.rs @@ -1,7 +1,6 @@ // gRPC Router Implementation use std::sync::Arc; -use std::time::Duration; use async_trait::async_trait; use axum::{ @@ -13,12 +12,10 @@ use axum::{ use tracing::{debug, error, info, warn}; use crate::config::types::RetryConfig; -use crate::core::{ - BasicWorkerBuilder, CircuitBreakerConfig, HealthConfig, WorkerRegistry, WorkerType, -}; +use crate::core::{WorkerRegistry, WorkerType}; use crate::grpc_client::{proto, SglangSchedulerClient}; use crate::metrics::RouterMetrics; -use crate::policies::{LoadBalancingPolicy, PolicyRegistry}; +use crate::policies::PolicyRegistry; use crate::protocols::spec::{ChatCompletionRequest, StringOrArray}; use crate::reasoning_parser::ParserFactory; use crate::routers::RouterTrait; @@ -38,39 +35,21 @@ pub struct ProcessedMessages { } /// gRPC router implementation for SGLang -#[allow(dead_code)] // Fields will be used once implementation is complete +#[allow(dead_code)] pub struct GrpcRouter { - /// Centralized worker registry worker_registry: Arc, - /// Centralized policy registry policy_registry: Arc, - /// Load balancing policy - policy: Arc, - /// Tokenizer for handling text encoding/decoding tokenizer: Arc, - /// Reasoning parser factory for structured reasoning outputs reasoning_parser_factory: ParserFactory, - /// Tool parser registry for function/tool calls tool_parser_registry: &'static ParserRegistry, - /// Configuration - timeout_secs: u64, - interval_secs: u64, dp_aware: bool, api_key: Option, retry_config: RetryConfig, - circuit_breaker_config: CircuitBreakerConfig, } impl GrpcRouter { /// Create a new gRPC router - pub async fn new( - worker_urls: Vec, - policy: Arc, - ctx: &Arc, - ) -> Result { - // Update metrics - RouterMetrics::set_active_workers(worker_urls.len()); - + pub async fn new(ctx: &Arc) -> Result { // Extract necessary components from context let tokenizer = ctx .tokenizer @@ -86,77 +65,28 @@ impl GrpcRouter { .tool_parser_registry .ok_or_else(|| "gRPC router requires tool parser registry".to_string())?; - // Convert config CircuitBreakerConfig to core CircuitBreakerConfig - let circuit_breaker_config = ctx.router_config.effective_circuit_breaker_config(); - let core_cb_config = CircuitBreakerConfig { - failure_threshold: circuit_breaker_config.failure_threshold, - success_threshold: circuit_breaker_config.success_threshold, - timeout_duration: Duration::from_secs(circuit_breaker_config.timeout_duration_secs), - window_duration: Duration::from_secs(circuit_breaker_config.window_duration_secs), - }; - - // Get registries from context let worker_registry = ctx.worker_registry.clone(); let policy_registry = ctx.policy_registry.clone(); - // Create Worker trait objects with gRPC connection mode and register them - // Workers will lazily initialize their gRPC clients on first use - for url in &worker_urls { - let worker = BasicWorkerBuilder::new(url.clone()) - .worker_type(WorkerType::Regular) - .connection_mode(crate::core::ConnectionMode::Grpc { port: None }) - .circuit_breaker_config(core_cb_config.clone()) - .health_config(HealthConfig { - timeout_secs: ctx.router_config.health_check.timeout_secs, - check_interval_secs: ctx.router_config.health_check.check_interval_secs, - endpoint: ctx.router_config.health_check.endpoint.clone(), - failure_threshold: ctx.router_config.health_check.failure_threshold, - success_threshold: ctx.router_config.health_check.success_threshold, - }) - .build(); - - worker_registry.register(Arc::new(worker)); - info!( - "Registered gRPC worker at {} (will connect on first use)", - url - ); - } - - if worker_urls.is_empty() { - return Err("No gRPC workers configured".to_string()); - } - - // Get only gRPC workers from registry for policy initialization let workers = worker_registry.get_workers_filtered( - None, // any model + None, Some(WorkerType::Regular), Some(crate::core::ConnectionMode::Grpc { port: None }), - false, // include unhealthy workers during initialization + false, ); - // Initialize policy with workers if needed - if let Some(cache_aware) = policy - .as_any() - .downcast_ref::() - { - cache_aware.init_workers(&workers); - } - - // No need for local health checkers - WorkerRegistry handles health checking + RouterMetrics::set_active_workers(workers.len()); + info!("gRPC router found {} workers in registry", workers.len()); Ok(GrpcRouter { worker_registry, policy_registry, - policy, tokenizer, reasoning_parser_factory, tool_parser_registry, - timeout_secs: ctx.router_config.worker_startup_timeout_secs, - interval_secs: ctx.router_config.worker_startup_check_interval_secs, dp_aware: ctx.router_config.dp_aware, api_key: ctx.router_config.api_key.clone(), retry_config: ctx.router_config.effective_retry_config(), - circuit_breaker_config: core_cb_config, }) } @@ -576,8 +506,6 @@ impl std::fmt::Debug for GrpcRouter { let stats = self.worker_registry.stats(); f.debug_struct("GrpcRouter") .field("workers_count", &stats.total_workers) - .field("timeout_secs", &self.timeout_secs) - .field("interval_secs", &self.interval_secs) .field("dp_aware", &self.dp_aware) .finish() }