[router] cleanup app context and move to startup (#11617)

This commit is contained in:
Simo Lin
2025-10-14 13:19:28 -04:00
committed by GitHub
parent eb8cac6fe2
commit 3962e39d7c
5 changed files with 341 additions and 197 deletions

View File

@@ -7,8 +7,8 @@ use crate::{
data_connector::{
MemoryConversationItemStorage, MemoryConversationStorage, MemoryResponseStorage,
NoOpConversationStorage, NoOpResponseStorage, OracleConversationItemStorage,
OracleConversationStorage, OracleResponseStorage, SharedConversationStorage,
SharedResponseStorage,
OracleConversationStorage, OracleResponseStorage, SharedConversationItemStorage,
SharedConversationStorage, SharedResponseStorage,
},
logging::{self, LoggingConfig},
metrics::{self, PrometheusConfig},
@@ -62,7 +62,7 @@ pub struct AppContext {
pub router_manager: Option<Arc<RouterManager>>,
pub response_storage: SharedResponseStorage,
pub conversation_storage: SharedConversationStorage,
pub conversation_item_storage: crate::data_connector::SharedConversationItemStorage,
pub conversation_item_storage: SharedConversationItemStorage,
pub load_monitor: Option<Arc<LoadMonitor>>,
pub configured_reasoning_parser: Option<String>,
pub configured_tool_parser: Option<String>,
@@ -70,135 +70,26 @@ pub struct AppContext {
}
impl AppContext {
#[allow(clippy::too_many_arguments)]
pub fn new(
router_config: RouterConfig,
client: Client,
max_concurrent_requests: i32,
rate_limit_tokens_per_second: Option<i32>,
) -> Result<Self, String> {
let rate_limiter = match max_concurrent_requests {
n if n <= 0 => None,
n => {
let rate_limit_tokens =
rate_limit_tokens_per_second.filter(|&t| t > 0).unwrap_or(n);
Some(Arc::new(TokenBucket::new(
n as usize,
rate_limit_tokens as usize,
)))
}
};
let (tokenizer, reasoning_parser_factory, tool_parser_factory) = if router_config
.connection_mode
== ConnectionMode::Grpc
{
let tokenizer_path = router_config
.tokenizer_path
.clone()
.or_else(|| router_config.model_path.clone())
.ok_or_else(|| {
"gRPC mode requires either --tokenizer-path or --model-path to be specified"
.to_string()
})?;
let tokenizer = Some(
tokenizer_factory::create_tokenizer_with_chat_template_blocking(
&tokenizer_path,
router_config.chat_template.as_deref(),
)
.map_err(|e| {
format!(
"Failed to create tokenizer from '{}': {}. \
Ensure the path is valid and points to a tokenizer file (tokenizer.json) \
or a HuggingFace model ID. For directories, ensure they contain tokenizer files.",
tokenizer_path, e
)
})?,
);
let reasoning_parser_factory = Some(crate::reasoning_parser::ParserFactory::new());
let tool_parser_factory = Some(crate::tool_parser::ParserFactory::new());
(tokenizer, reasoning_parser_factory, tool_parser_factory)
} else {
(None, None, None)
};
let worker_registry = Arc::new(WorkerRegistry::new());
let policy_registry = Arc::new(PolicyRegistry::new(router_config.policy.clone()));
let router_manager = None;
let (response_storage, conversation_storage): (
SharedResponseStorage,
SharedConversationStorage,
) = match router_config.history_backend {
HistoryBackend::Memory => {
info!("Initializing data connector: Memory");
(
Arc::new(MemoryResponseStorage::new()),
Arc::new(MemoryConversationStorage::new()),
)
}
HistoryBackend::None => {
info!("Initializing data connector: None (no persistence)");
(
Arc::new(NoOpResponseStorage::new()),
Arc::new(NoOpConversationStorage::new()),
)
}
HistoryBackend::Oracle => {
let oracle_cfg = router_config.oracle.clone().ok_or_else(|| {
"oracle configuration is required when history_backend=oracle".to_string()
})?;
info!(
"Initializing data connector: Oracle ATP (pool: {}-{})",
oracle_cfg.pool_min, oracle_cfg.pool_max
);
let response_storage =
OracleResponseStorage::new(oracle_cfg.clone()).map_err(|err| {
format!("failed to initialize Oracle response storage: {err}")
})?;
let conversation_storage = OracleConversationStorage::new(oracle_cfg.clone())
.map_err(|err| {
format!("failed to initialize Oracle conversation storage: {err}")
})?;
info!("Data connector initialized successfully: Oracle ATP");
(Arc::new(response_storage), Arc::new(conversation_storage))
}
};
// Conversation items storage (memory-backed for now)
let conversation_item_storage: crate::data_connector::SharedConversationItemStorage =
match router_config.history_backend {
HistoryBackend::Oracle => {
let oracle_cfg = router_config.oracle.clone().ok_or_else(|| {
"oracle configuration is required when history_backend=oracle".to_string()
})?;
Arc::new(OracleConversationItemStorage::new(oracle_cfg).map_err(|e| {
format!("failed to initialize Oracle conversation item storage: {e}")
})?)
}
_ => Arc::new(MemoryConversationItemStorage::new()),
};
let load_monitor = Some(Arc::new(LoadMonitor::new(
worker_registry.clone(),
policy_registry.clone(),
client.clone(),
router_config.worker_startup_check_interval_secs,
)));
rate_limiter: Option<Arc<TokenBucket>>,
tokenizer: Option<Arc<dyn Tokenizer>>,
reasoning_parser_factory: Option<ReasoningParserFactory>,
tool_parser_factory: Option<ToolParserFactory>,
worker_registry: Arc<WorkerRegistry>,
policy_registry: Arc<PolicyRegistry>,
response_storage: SharedResponseStorage,
conversation_storage: SharedConversationStorage,
conversation_item_storage: SharedConversationItemStorage,
load_monitor: Option<Arc<LoadMonitor>>,
worker_job_queue: Arc<OnceLock<Arc<JobQueue>>>,
) -> Self {
let configured_reasoning_parser = router_config.reasoning_parser.clone();
let configured_tool_parser = router_config.tool_call_parser.clone();
// Create empty OnceLock for worker job queue (will be initialized in startup())
let worker_job_queue = Arc::new(OnceLock::new());
Ok(Self {
Self {
client,
router_config,
rate_limiter,
@@ -207,7 +98,7 @@ impl AppContext {
tool_parser_factory,
worker_registry,
policy_registry,
router_manager,
router_manager: None,
response_storage,
conversation_storage,
conversation_item_storage,
@@ -215,7 +106,7 @@ impl AppContext {
configured_reasoning_parser,
configured_tool_parser,
worker_job_queue,
})
}
}
}
@@ -936,12 +827,146 @@ pub async fn startup(config: ServerConfig) -> Result<(), Box<dyn std::error::Err
.build()
.expect("Failed to create HTTP client");
// Initialize rate limiter
let rate_limiter = match config.router_config.max_concurrent_requests {
n if n <= 0 => None,
n => {
let rate_limit_tokens = config
.router_config
.rate_limit_tokens_per_second
.filter(|&t| t > 0)
.unwrap_or(n);
Some(Arc::new(TokenBucket::new(
n as usize,
rate_limit_tokens as usize,
)))
}
};
// Initialize tokenizer and parser factories for gRPC mode
let (tokenizer, reasoning_parser_factory, tool_parser_factory) = if config
.router_config
.connection_mode
== ConnectionMode::Grpc
{
let tokenizer_path = config
.router_config
.tokenizer_path
.clone()
.or_else(|| config.router_config.model_path.clone())
.ok_or_else(|| {
"gRPC mode requires either --tokenizer-path or --model-path to be specified"
.to_string()
})?;
let tokenizer = Some(
tokenizer_factory::create_tokenizer_with_chat_template_blocking(
&tokenizer_path,
config.router_config.chat_template.as_deref(),
)
.map_err(|e| {
format!(
"Failed to create tokenizer from '{}': {}. \
Ensure the path is valid and points to a tokenizer file (tokenizer.json) \
or a HuggingFace model ID. For directories, ensure they contain tokenizer files.",
tokenizer_path, e
)
})?,
);
let reasoning_parser_factory = Some(ReasoningParserFactory::new());
let tool_parser_factory = Some(ToolParserFactory::new());
(tokenizer, reasoning_parser_factory, tool_parser_factory)
} else {
(None, None, None)
};
// Initialize worker registry and policy registry
let worker_registry = Arc::new(WorkerRegistry::new());
let policy_registry = Arc::new(PolicyRegistry::new(config.router_config.policy.clone()));
// Initialize storage backends
let (response_storage, conversation_storage): (
SharedResponseStorage,
SharedConversationStorage,
) = match config.router_config.history_backend {
HistoryBackend::Memory => {
info!("Initializing data connector: Memory");
(
Arc::new(MemoryResponseStorage::new()),
Arc::new(MemoryConversationStorage::new()),
)
}
HistoryBackend::None => {
info!("Initializing data connector: None (no persistence)");
(
Arc::new(NoOpResponseStorage::new()),
Arc::new(NoOpConversationStorage::new()),
)
}
HistoryBackend::Oracle => {
let oracle_cfg = config.router_config.oracle.clone().ok_or_else(|| {
"oracle configuration is required when history_backend=oracle".to_string()
})?;
info!(
"Initializing data connector: Oracle ATP (pool: {}-{})",
oracle_cfg.pool_min, oracle_cfg.pool_max
);
let response_storage = OracleResponseStorage::new(oracle_cfg.clone())
.map_err(|err| format!("failed to initialize Oracle response storage: {err}"))?;
let conversation_storage =
OracleConversationStorage::new(oracle_cfg.clone()).map_err(|err| {
format!("failed to initialize Oracle conversation storage: {err}")
})?;
info!("Data connector initialized successfully: Oracle ATP");
(Arc::new(response_storage), Arc::new(conversation_storage))
}
};
// Initialize conversation items storage
let conversation_item_storage: SharedConversationItemStorage =
match config.router_config.history_backend {
HistoryBackend::Oracle => {
let oracle_cfg = config.router_config.oracle.clone().ok_or_else(|| {
"oracle configuration is required when history_backend=oracle".to_string()
})?;
Arc::new(OracleConversationItemStorage::new(oracle_cfg).map_err(|e| {
format!("failed to initialize Oracle conversation item storage: {e}")
})?)
}
_ => Arc::new(MemoryConversationItemStorage::new()),
};
// Initialize load monitor
let load_monitor = Some(Arc::new(LoadMonitor::new(
worker_registry.clone(),
policy_registry.clone(),
client.clone(),
config.router_config.worker_startup_check_interval_secs,
)));
// Create empty OnceLock for worker job queue (will be initialized below)
let worker_job_queue = Arc::new(OnceLock::new());
// Create AppContext with all initialized components
let app_context = AppContext::new(
config.router_config.clone(),
client.clone(),
config.router_config.max_concurrent_requests,
config.router_config.rate_limit_tokens_per_second,
)?;
rate_limiter,
tokenizer,
reasoning_parser_factory,
tool_parser_factory,
worker_registry,
policy_registry,
response_storage,
conversation_storage,
conversation_item_storage,
load_monitor,
worker_job_queue,
);
let app_context = Arc::new(app_context);