diff --git a/sgl-router/README.md b/sgl-router/README.md index eaf58187e..e9f77eea6 100644 --- a/sgl-router/README.md +++ b/sgl-router/README.md @@ -79,6 +79,25 @@ python -m sglang_router.launch_router \ --worker-urls http://worker1:8000 http://worker2:8000 ``` +#### OpenAI Backend Mode +Route requests to OpenAI or OpenAI-compatible endpoints: + +```bash +# Route to OpenAI API +python -m sglang_router.launch_router \ + --backend openai \ + --worker-urls https://api.openai.com + +# Route to custom OpenAI-compatible endpoint +python -m sglang_router.launch_router \ + --backend openai \ + --worker-urls http://my-openai-compatible-service:8000 +``` + +**Note**: + - OpenAI backend mode acts as a simple proxy. Load balancing is not applicable in this mode. + - This mode now requires exactly one `--worker-urls` entry. + #### Launch Router with Worker URLs in prefill-decode mode ```bash # Note that the prefill and decode URLs must be provided in the following format: @@ -194,6 +213,65 @@ python -m sglang_router.launch_router \ Default headers: `x-request-id`, `x-correlation-id`, `x-trace-id`, `request-id` +### History Backend (Conversation Storage) + +Store conversation and response data for tracking, debugging, or analytics. + +**NOTE: This feature is currently only supported in OpenAI router mode.** + +#### Available options for history storage + +- **Memory** (default): In-memory storage, fast but ephemeral +- **None**: No storage, minimal overhead +- **Oracle**: Persistent storage using Oracle ATP + +```bash +# Memory backend (default) +python -m sglang_router.launch_router --backend openai \ + --worker-urls https://api.openai.com \ + --history-backend memory + +# No storage for maximum performance +python -m sglang_router.launch_router --backend openai \ + --worker-urls https://api.openai.com \ + --history-backend none + +# Oracle ATP backend + +# Install Oracle Instant Client +# https://download.oracle.com/otn_software/linux/instantclient/2390000/instantclient-basic-linux.x64-23.9.0.25.07.zip +export LD_LIBRARY_PATH=/home/ubuntu/instant-client/instantclient_23_9 + +# choose ONE of the following connection methods: +# Option 1: Using full connection descriptor +export ATP_DSN="(description=(address=(protocol=tcps)(port=1522)(host=adb.region.oraclecloud.com))(connect_data=(service_name=service_name)))" + +# Option 2: Using TNS alias (requires wallet) +export ATP_TNS_ALIAS="sglroutertestatp_high" +export ATP_WALLET_PATH="/path/to/wallet" + +# service user config +export ATP_USER="admin" +export ATP_PASSWORD="YourPassword123" + +python -m sglang_router.launch_router \ + --worker-urls https://api.openai.com \ + --backend openai \ + --history-backend oracle +``` + +**Oracle Configuration Parameters:** +- `--oracle-tns-alias`: TNS alias from tnsnames.ora (env: `ATP_TNS_ALIAS`) + - Requires `--oracle-wallet-path` to locate tnsnames.ora +- `--oracle-username`: Database username (env: `ATP_USER`) +- `--oracle-password`: Database password (env: `ATP_PASSWORD`) +- `--oracle-wallet-path`: Path to wallet directory (env: `ATP_WALLET_PATH`) + - Required when using TNS alias +- `--oracle-pool-min`: Minimum connections (default: 1, env: `ATP_POOL_MIN`) +- `--oracle-pool-max`: Maximum connections (default: 16, env: `ATP_POOL_MAX`) + +**Note**: You must provide **either** `--oracle-tns-alias` **or** `--oracle-connect-descriptor`, but not both. + ## Advanced Features ### Kubernetes Service Discovery @@ -407,6 +485,24 @@ curl -X POST http://localhost:8080/add_worker?url=http://worker3:8000&api_key=wo ### Command Line Arguments Reference +#### Backend Selection +- `--backend`: Backend runtime to use (default: `sglang`) + - `sglang`: SGLang workers (default) + - `openai`: OpenAI or OpenAI-compatible endpoints + +#### History Backend +- `--history-backend`: Storage backend for conversations (default: `memory`) + - `memory`: In-memory storage (default) + - `none`: No storage + - `oracle`: Oracle ATP persistent storage +- `--oracle-tns-alias`: Oracle TNS alias from tnsnames.ora (env: `ATP_TNS_ALIAS`, mutually exclusive with `--oracle-connect-descriptor`) +- `--oracle-connect-descriptor`: Oracle full connection string (env: `ATP_DSN`, mutually exclusive with `--oracle-tns-alias`) +- `--oracle-username`: Oracle username (env: `ATP_USER`) +- `--oracle-password`: Oracle password (env: `ATP_PASSWORD`) +- `--oracle-wallet-path`: Oracle wallet directory (env: `ATP_WALLET_PATH`, required for TNS alias) +- `--oracle-pool-min`: Min pool connections (default: 1, env: `ATP_POOL_MIN`) +- `--oracle-pool-max`: Max pool connections (default: 16, env: `ATP_POOL_MAX`) + #### Service Discovery - `--service-discovery`: Enable Kubernetes service discovery - `--service-discovery-port`: Port for worker URLs (default: 8000) @@ -479,6 +575,8 @@ The continuous integration pipeline includes comprehensive testing, benchmarking - **Random**: Distributes requests randomly across available workers - **Round Robin**: Sequential distribution across workers in rotation - **Prefill-Decode Disaggregation**: Specialized load balancing for separated prefill and decode servers +- **Multiple Backend Support**: Route to SGLang, OpenAI, or other OpenAI-compatible services +- **Conversation Storage**: Track conversations with memory, Oracle ATP, or disable for minimal overhead - **Service Discovery**: Automatic Kubernetes worker discovery and health management - **Monitoring**: Comprehensive Prometheus metrics and structured logging - **Scalability**: Handles thousands of concurrent connections with efficient resource utilization diff --git a/sgl-router/py_src/sglang_router/router.py b/sgl-router/py_src/sglang_router/router.py index 74f2cef73..733848efe 100644 --- a/sgl-router/py_src/sglang_router/router.py +++ b/sgl-router/py_src/sglang_router/router.py @@ -1,7 +1,7 @@ from typing import Optional from sglang_router.router_args import RouterArgs -from sglang_router_rs import PolicyType +from sglang_router_rs import BackendType, HistoryBackendType, PolicyType, PyOracleConfig from sglang_router_rs import Router as _Router @@ -18,6 +18,39 @@ def policy_from_str(policy_str: Optional[str]) -> PolicyType: return policy_map[policy_str] +def backend_from_str(backend_str: Optional[str]) -> BackendType: + """Convert backend string to BackendType enum.""" + if isinstance(backend_str, BackendType): + return backend_str + if backend_str is None: + return BackendType.Sglang + backend_map = {"sglang": BackendType.Sglang, "openai": BackendType.Openai} + backend_lower = backend_str.lower() + if backend_lower not in backend_map: + raise ValueError( + f"Unknown backend: {backend_str}. Valid options: {', '.join(backend_map.keys())}" + ) + return backend_map[backend_lower] + + +def history_backend_from_str(backend_str: Optional[str]) -> HistoryBackendType: + """Convert history backend string to HistoryBackendType enum.""" + if isinstance(backend_str, HistoryBackendType): + return backend_str + if backend_str is None: + return HistoryBackendType.Memory + backend_lower = backend_str.lower() + if backend_lower == "memory": + return HistoryBackendType.Memory + elif backend_lower == "none": + # Use getattr to access 'None' which is a Python keyword + return getattr(HistoryBackendType, "None") + elif backend_lower == "oracle": + return HistoryBackendType.Oracle + else: + raise ValueError(f"Unknown history backend: {backend_str}") + + class Router: """ A high-performance router for distributing requests across worker nodes. @@ -119,8 +152,49 @@ class Router: args_dict["prefill_policy"] = policy_from_str(args_dict["prefill_policy"]) args_dict["decode_policy"] = policy_from_str(args_dict["decode_policy"]) - # remove mini_lb parameter - args_dict.pop("mini_lb") + # Convert backend + args_dict["backend"] = backend_from_str(args_dict.get("backend")) + + # Convert history_backend to enum first + history_backend_raw = args_dict.get("history_backend", "memory") + history_backend = history_backend_from_str(history_backend_raw) + + # Convert Oracle config if needed + oracle_config = None + if history_backend == HistoryBackendType.Oracle: + # Prioritize TNS alias over connect descriptor + tns_alias = args_dict.get("oracle_tns_alias") + connect_descriptor = args_dict.get("oracle_connect_descriptor") + + # Use TNS alias if provided, otherwise use connect descriptor + final_descriptor = tns_alias if tns_alias else connect_descriptor + + oracle_config = PyOracleConfig( + password=args_dict.get("oracle_password"), + username=args_dict.get("oracle_username"), + connect_descriptor=final_descriptor, + wallet_path=args_dict.get("oracle_wallet_path"), + pool_min=args_dict.get("oracle_pool_min", 1), + pool_max=args_dict.get("oracle_pool_max", 16), + pool_timeout_secs=args_dict.get("oracle_pool_timeout_secs", 30), + ) + args_dict["oracle_config"] = oracle_config + args_dict["history_backend"] = history_backend + + # Remove fields that shouldn't be passed to Rust Router constructor + fields_to_remove = [ + "mini_lb", + "oracle_wallet_path", + "oracle_tns_alias", + "oracle_connect_descriptor", + "oracle_username", + "oracle_password", + "oracle_pool_min", + "oracle_pool_max", + "oracle_pool_timeout_secs", + ] + for field in fields_to_remove: + args_dict.pop(field, None) return Router(_Router(**args_dict)) diff --git a/sgl-router/py_src/sglang_router/router_args.py b/sgl-router/py_src/sglang_router/router_args.py index 6d68535b5..587e5a023 100644 --- a/sgl-router/py_src/sglang_router/router_args.py +++ b/sgl-router/py_src/sglang_router/router_args.py @@ -1,6 +1,7 @@ import argparse import dataclasses import logging +import os from typing import Dict, List, Optional logger = logging.getLogger(__name__) @@ -88,6 +89,18 @@ class RouterArgs: chat_template: Optional[str] = None reasoning_parser: Optional[str] = None tool_call_parser: Optional[str] = None + # Backend selection + backend: str = "sglang" + # History backend configuration + history_backend: str = "memory" + oracle_wallet_path: Optional[str] = None + oracle_tns_alias: Optional[str] = None + oracle_connect_descriptor: Optional[str] = None + oracle_username: Optional[str] = None + oracle_password: Optional[str] = None + oracle_pool_min: int = 1 + oracle_pool_max: int = 16 + oracle_pool_timeout_secs: int = 30 @staticmethod def add_cli_args( @@ -466,6 +479,73 @@ class RouterArgs: default=None, help="Specify the parser for handling tool-call interactions", ) + # Backend selection + parser.add_argument( + f"--{prefix}backend", + type=str, + default=RouterArgs.backend, + choices=["sglang", "openai"], + help="Backend runtime to use (default: sglang)", + ) + # History backend configuration + parser.add_argument( + f"--{prefix}history-backend", + type=str, + default=RouterArgs.history_backend, + choices=["memory", "none", "oracle"], + help="History storage backend for conversations and responses (default: memory)", + ) + # Oracle configuration + parser.add_argument( + f"--{prefix}oracle-wallet-path", + type=str, + default=os.getenv("ATP_WALLET_PATH"), + help="Path to Oracle ATP wallet directory (env: ATP_WALLET_PATH)", + ) + parser.add_argument( + f"--{prefix}oracle-tns-alias", + type=str, + default=os.getenv("ATP_TNS_ALIAS"), + help="Oracle TNS alias from tnsnames.ora (env: ATP_TNS_ALIAS).", + ) + parser.add_argument( + f"--{prefix}oracle-connect-descriptor", + type=str, + default=os.getenv("ATP_DSN"), + help="Oracle connection descriptor/DSN (full connection string) (env: ATP_DSN)", + ) + parser.add_argument( + f"--{prefix}oracle-username", + type=str, + default=os.getenv("ATP_USER"), + help="Oracle database username (env: ATP_USER)", + ) + parser.add_argument( + f"--{prefix}oracle-password", + type=str, + default=os.getenv("ATP_PASSWORD"), + help="Oracle database password (env: ATP_PASSWORD)", + ) + parser.add_argument( + f"--{prefix}oracle-pool-min", + type=int, + default=int(os.getenv("ATP_POOL_MIN", RouterArgs.oracle_pool_min)), + help="Minimum Oracle connection pool size (default: 1, env: ATP_POOL_MIN)", + ) + parser.add_argument( + f"--{prefix}oracle-pool-max", + type=int, + default=int(os.getenv("ATP_POOL_MAX", RouterArgs.oracle_pool_max)), + help="Maximum Oracle connection pool size (default: 16, env: ATP_POOL_MAX)", + ) + parser.add_argument( + f"--{prefix}oracle-pool-timeout-secs", + type=int, + default=int( + os.getenv("ATP_POOL_TIMEOUT_SECS", RouterArgs.oracle_pool_timeout_secs) + ), + help="Oracle connection pool timeout in seconds (default: 30, env: ATP_POOL_TIMEOUT_SECS)", + ) @classmethod def from_cli_args( diff --git a/sgl-router/src/config/validation.rs b/sgl-router/src/config/validation.rs index 97f825f5a..1d8457d61 100644 --- a/sgl-router/src/config/validation.rs +++ b/sgl-router/src/config/validation.rs @@ -29,9 +29,69 @@ impl ConfigValidator { Self::validate_retry(&retry_cfg)?; Self::validate_circuit_breaker(&cb_cfg)?; - if config.history_backend == HistoryBackend::Oracle && config.oracle.is_none() { + // Validate Oracle configuration if enabled + if config.history_backend == HistoryBackend::Oracle { + if config.oracle.is_none() { + return Err(ConfigError::MissingRequired { + field: "oracle".to_string(), + }); + } + // Validate Oracle configuration details + if let Some(oracle) = &config.oracle { + Self::validate_oracle(oracle)?; + } + } + + Ok(()) + } + + /// Validate Oracle configuration + fn validate_oracle(oracle: &OracleConfig) -> ConfigResult<()> { + // Validate username is not empty + if oracle.username.is_empty() { return Err(ConfigError::MissingRequired { - field: "oracle".to_string(), + field: "oracle.username".to_string(), + }); + } + + // Validate password is not empty + if oracle.password.is_empty() { + return Err(ConfigError::MissingRequired { + field: "oracle.password".to_string(), + }); + } + + // Validate connect_descriptor is not empty + if oracle.connect_descriptor.is_empty() { + return Err(ConfigError::MissingRequired { + field: "oracle_dsn or oracle_tns_alias".to_string(), + }); + } + + // Validate pool_min is at least 1 + if oracle.pool_min < 1 { + return Err(ConfigError::InvalidValue { + field: "oracle.pool_min".to_string(), + value: oracle.pool_min.to_string(), + reason: "Must be at least 1".to_string(), + }); + } + + // Validate pool_max is greater than or equal to pool_min + if oracle.pool_max < oracle.pool_min { + return Err(ConfigError::InvalidValue { + field: "oracle.pool_max".to_string(), + value: oracle.pool_max.to_string(), + reason: "Must be >= oracle.pool_min".to_string(), + }); + } + + // Validate pool_timeout_secs is greater than 0 + if oracle.pool_timeout_secs == 0 { + return Err(ConfigError::InvalidValue { + field: "oracle.pool_timeout_secs".to_string(), + value: oracle.pool_timeout_secs.to_string(), + reason: "Must be > 0".to_string(), }); } diff --git a/sgl-router/src/lib.rs b/sgl-router/src/lib.rs index 92dd1950f..6a9a9da1d 100644 --- a/sgl-router/src/lib.rs +++ b/sgl-router/src/lib.rs @@ -30,6 +30,113 @@ pub enum PolicyType { PowerOfTwo, } +#[pyclass(eq)] +#[derive(Clone, PartialEq, Debug)] +pub enum BackendType { + Sglang, + Openai, +} + +#[pyclass(eq)] +#[derive(Clone, PartialEq, Debug)] +pub enum HistoryBackendType { + Memory, + None, + Oracle, +} + +#[pyclass] +#[derive(Clone, PartialEq)] +pub struct PyOracleConfig { + #[pyo3(get, set)] + pub wallet_path: Option, + #[pyo3(get, set)] + pub connect_descriptor: Option, + #[pyo3(get, set)] + pub username: Option, + #[pyo3(get, set)] + pub password: Option, + #[pyo3(get, set)] + pub pool_min: usize, + #[pyo3(get, set)] + pub pool_max: usize, + #[pyo3(get, set)] + pub pool_timeout_secs: u64, +} + +impl std::fmt::Debug for PyOracleConfig { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PyOracleConfig") + .field("wallet_path", &self.wallet_path) + .field("connect_descriptor", &"") + .field("username", &self.username) + .field("password", &"") + .field("pool_min", &self.pool_min) + .field("pool_max", &self.pool_max) + .field("pool_timeout_secs", &self.pool_timeout_secs) + .finish() + } +} + +#[pymethods] +impl PyOracleConfig { + #[new] + #[pyo3(signature = ( + password = None, + username = None, + connect_descriptor = None, + wallet_path = None, + pool_min = 1, + pool_max = 16, + pool_timeout_secs = 30, + ))] + fn new( + password: Option, + username: Option, + connect_descriptor: Option, + wallet_path: Option, + pool_min: usize, + pool_max: usize, + pool_timeout_secs: u64, + ) -> PyResult { + if pool_min == 0 { + return Err(pyo3::exceptions::PyValueError::new_err( + "pool_min must be at least 1", + )); + } + if pool_max < pool_min { + return Err(pyo3::exceptions::PyValueError::new_err( + "pool_max must be >= pool_min", + )); + } + + Ok(PyOracleConfig { + wallet_path, + connect_descriptor, + username, + password, + pool_min, + pool_max, + pool_timeout_secs, + }) + } +} + +impl PyOracleConfig { + fn to_config_oracle(&self) -> config::OracleConfig { + // Simple conversion - validation happens later in validate_oracle() + config::OracleConfig { + wallet_path: self.wallet_path.clone(), + connect_descriptor: self.connect_descriptor.clone().unwrap_or_default(), + username: self.username.clone().unwrap_or_default(), + password: self.password.clone().unwrap_or_default(), + pool_min: self.pool_min, + pool_max: self.pool_max, + pool_timeout_secs: self.pool_timeout_secs, + } + } +} + #[pyclass] #[derive(Debug, Clone, PartialEq)] struct Router { @@ -93,6 +200,9 @@ struct Router { chat_template: Option, reasoning_parser: Option, tool_call_parser: Option, + backend: BackendType, + history_backend: HistoryBackendType, + oracle_config: Option, } impl Router { @@ -132,6 +242,10 @@ impl Router { RoutingMode::Regular { worker_urls: vec![], } + } else if matches!(self.backend, BackendType::Openai) { + RoutingMode::OpenAI { + worker_urls: self.worker_urls.clone(), + } } else if self.pd_disaggregation { RoutingMode::PrefillDecode { prefill_urls: self.prefill_urls.clone().unwrap_or_default(), @@ -170,6 +284,20 @@ impl Router { _ => None, }; + let history_backend = match self.history_backend { + HistoryBackendType::Memory => config::HistoryBackend::Memory, + HistoryBackendType::None => config::HistoryBackend::None, + HistoryBackendType::Oracle => config::HistoryBackend::Oracle, + }; + + let oracle = if matches!(self.history_backend, HistoryBackendType::Oracle) { + self.oracle_config + .as_ref() + .map(|cfg| cfg.to_config_oracle()) + } else { + None + }; + Ok(config::RouterConfig { mode, policy, @@ -218,8 +346,8 @@ impl Router { model_path: self.model_path.clone(), tokenizer_path: self.tokenizer_path.clone(), chat_template: self.chat_template.clone(), - history_backend: config::HistoryBackend::Memory, - oracle: None, + history_backend, + oracle, reasoning_parser: self.reasoning_parser.clone(), tool_call_parser: self.tool_call_parser.clone(), }) @@ -289,6 +417,9 @@ impl Router { chat_template = None, reasoning_parser = None, tool_call_parser = None, + backend = BackendType::Sglang, + history_backend = HistoryBackendType::Memory, + oracle_config = None, ))] #[allow(clippy::too_many_arguments)] fn new( @@ -351,6 +482,9 @@ impl Router { chat_template: Option, reasoning_parser: Option, tool_call_parser: Option, + backend: BackendType, + history_backend: HistoryBackendType, + oracle_config: Option, ) -> PyResult { let mut all_urls = worker_urls.clone(); @@ -427,6 +561,9 @@ impl Router { chat_template, reasoning_parser, tool_call_parser, + backend, + history_backend, + oracle_config, }) } @@ -491,6 +628,9 @@ impl Router { #[pymodule] fn sglang_router_rs(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; m.add_class::()?; Ok(()) } diff --git a/sgl-router/src/routers/openai/conversations.rs b/sgl-router/src/routers/openai/conversations.rs index dfae8a15a..4f534b943 100644 --- a/sgl-router/src/routers/openai/conversations.rs +++ b/sgl-router/src/routers/openai/conversations.rs @@ -14,7 +14,7 @@ use chrono::Utc; use serde_json::{json, Value}; use std::collections::HashMap; use std::sync::Arc; -use tracing::{info, warn}; +use tracing::{debug, info, warn}; use super::responses::build_stored_response; @@ -958,14 +958,14 @@ async fn create_and_link_item( .await .map_err(|e| format!("Failed to link item: {}", e))?; - info!( + debug!( conversation_id = %conv_id.0, item_id = %created.id.0, item_type = %created.item_type, "Persisted conversation item and link" ); } else { - info!( + debug!( item_id = %created.id.0, item_type = %created.item_type, "Persisted conversation item (no conversation link)" diff --git a/sgl-router/src/routers/openai/router.rs b/sgl-router/src/routers/openai/router.rs index 607a94dd3..0648c1652 100644 --- a/sgl-router/src/routers/openai/router.rs +++ b/sgl-router/src/routers/openai/router.rs @@ -27,7 +27,7 @@ use std::{ }; use tokio::sync::mpsc; use tokio_stream::wrappers::UnboundedReceiverStream; -use tracing::{info, warn}; +use tracing::warn; // Import from sibling modules use super::conversations::{ @@ -197,6 +197,11 @@ impl OpenAIRouter { Ok(r) => r, Err(e) => { self.circuit_breaker.record_failure(); + tracing::error!( + url = %url, + error = %e, + "Failed to forward request to OpenAI" + ); return ( StatusCode::BAD_GATEWAY, format!("Failed to forward request to OpenAI: {}", e), @@ -518,12 +523,6 @@ impl crate::routers::RouterTrait for OpenAIRouter { ) -> Response { let url = format!("{}/v1/responses", self.base_url); - info!( - requested_store = body.store, - is_streaming = body.stream, - "openai_responses_request" - ); - // Validate mutually exclusive params: previous_response_id and conversation // TODO: this validation logic should move the right place, also we need a proper error message module if body.previous_response_id.is_some() && body.conversation.is_some() { diff --git a/sgl-router/src/server.rs b/sgl-router/src/server.rs index ec8595134..b84198a3e 100644 --- a/sgl-router/src/server.rs +++ b/sgl-router/src/server.rs @@ -132,19 +132,30 @@ impl AppContext { SharedResponseStorage, SharedConversationStorage, ) = match router_config.history_backend { - HistoryBackend::Memory => ( - Arc::new(MemoryResponseStorage::new()), - Arc::new(MemoryConversationStorage::new()), - ), - HistoryBackend::None => ( - Arc::new(NoOpResponseStorage::new()), - Arc::new(NoOpConversationStorage::new()), - ), + HistoryBackend::Memory => { + info!("Initializing data connector: Memory"); + ( + Arc::new(MemoryResponseStorage::new()), + Arc::new(MemoryConversationStorage::new()), + ) + } + HistoryBackend::None => { + info!("Initializing data connector: None (no persistence)"); + ( + Arc::new(NoOpResponseStorage::new()), + Arc::new(NoOpConversationStorage::new()), + ) + } HistoryBackend::Oracle => { let oracle_cfg = router_config.oracle.clone().ok_or_else(|| { "oracle configuration is required when history_backend=oracle".to_string() })?; + info!( + "Initializing data connector: Oracle ATP (pool: {}-{})", + oracle_cfg.pool_min, oracle_cfg.pool_max + ); + let response_storage = OracleResponseStorage::new(oracle_cfg.clone()).map_err(|err| { format!("failed to initialize Oracle response storage: {err}") @@ -155,6 +166,7 @@ impl AppContext { format!("failed to initialize Oracle conversation storage: {err}") })?; + info!("Data connector initialized successfully: Oracle ATP"); (Arc::new(response_storage), Arc::new(conversation_storage)) } };