diff --git a/sgl-router/src/data_connector/conversation_item_memory_store.rs b/sgl-router/src/data_connector/conversation_item_memory_store.rs index cb8c65787..d15d17031 100644 --- a/sgl-router/src/data_connector/conversation_item_memory_store.rs +++ b/sgl-router/src/data_connector/conversation_item_memory_store.rs @@ -142,6 +142,50 @@ impl ConversationItemStorage for MemoryConversationItemStorage { Ok(results) } + + async fn get_item(&self, item_id: &ConversationItemId) -> Result> { + let items = self.items.read().unwrap(); + Ok(items.get(item_id).cloned()) + } + + async fn is_item_linked( + &self, + conversation_id: &ConversationId, + item_id: &ConversationItemId, + ) -> Result { + let rev = self.rev_index.read().unwrap(); + if let Some(conv_idx) = rev.get(conversation_id) { + Ok(conv_idx.contains_key(&item_id.0)) + } else { + Ok(false) + } + } + + async fn delete_item( + &self, + conversation_id: &ConversationId, + item_id: &ConversationItemId, + ) -> Result<()> { + // Get the key from rev_index and remove the entry at the same time + let key_to_remove = { + let mut rev = self.rev_index.write().unwrap(); + if let Some(conv_idx) = rev.get_mut(conversation_id) { + conv_idx.remove(&item_id.0) + } else { + None + } + }; + + // If the item was in rev_index, remove it from links as well + if let Some(key) = key_to_remove { + let mut links = self.links.write().unwrap(); + if let Some(conv_links) = links.get_mut(conversation_id) { + conv_links.remove(&key); + } + } + + Ok(()) + } } #[cfg(test)] diff --git a/sgl-router/src/data_connector/conversation_item_oracle_store.rs b/sgl-router/src/data_connector/conversation_item_oracle_store.rs index 64c1b2503..608c9376c 100644 --- a/sgl-router/src/data_connector/conversation_item_oracle_store.rs +++ b/sgl-router/src/data_connector/conversation_item_oracle_store.rs @@ -243,6 +243,92 @@ impl ConversationItemStorage for OracleConversationItemStorage { ) .collect() } + + async fn get_item(&self, item_id: &ConversationItemId) -> ItemResult> { + let iid = item_id.0.clone(); + + self.with_connection(move |conn| { + let mut stmt = conn + .statement( + "SELECT id, response_id, item_type, role, content, status, created_at \ + FROM conversation_items WHERE id = :1", + ) + .build() + .map_err(map_oracle_error)?; + + let mut rows = stmt.query(&[&iid]).map_err(map_oracle_error)?; + + if let Some(row_res) = rows.next() { + let row = row_res.map_err(map_oracle_error)?; + let id: String = row.get(0).map_err(map_oracle_error)?; + let response_id: Option = row.get(1).map_err(map_oracle_error)?; + let item_type: String = row.get(2).map_err(map_oracle_error)?; + let role: Option = row.get(3).map_err(map_oracle_error)?; + let content_raw: Option = row.get(4).map_err(map_oracle_error)?; + let status: Option = row.get(5).map_err(map_oracle_error)?; + let created_at: DateTime = row.get(6).map_err(map_oracle_error)?; + + let content = match content_raw { + Some(s) => serde_json::from_str(&s)?, + None => Value::Null, + }; + + Ok(Some(ConversationItem { + id: ConversationItemId(id), + response_id, + item_type, + role, + content, + status, + created_at, + })) + } else { + Ok(None) + } + }) + .await + } + + async fn is_item_linked( + &self, + conversation_id: &ConversationId, + item_id: &ConversationItemId, + ) -> ItemResult { + let cid = conversation_id.0.clone(); + let iid = item_id.0.clone(); + + self.with_connection(move |conn| { + let count: i64 = conn + .query_row_as( + "SELECT COUNT(*) FROM conversation_item_links WHERE conversation_id = :1 AND item_id = :2", + &[&cid, &iid], + ) + .map_err(map_oracle_error)?; + Ok(count > 0) + }) + .await + } + + async fn delete_item( + &self, + conversation_id: &ConversationId, + item_id: &ConversationItemId, + ) -> ItemResult<()> { + let cid = conversation_id.0.clone(); + let iid = item_id.0.clone(); + + self.with_connection(move |conn| { + // Delete ONLY the link (do not delete the item itself) + conn.execute( + "DELETE FROM conversation_item_links WHERE conversation_id = :1 AND item_id = :2", + &[&cid, &iid], + ) + .map_err(map_oracle_error)?; + + Ok(()) + }) + .await + } } #[derive(Clone)] diff --git a/sgl-router/src/data_connector/conversation_items.rs b/sgl-router/src/data_connector/conversation_items.rs index 58f624403..6c99b2b8f 100644 --- a/sgl-router/src/data_connector/conversation_items.rs +++ b/sgl-router/src/data_connector/conversation_items.rs @@ -94,15 +94,32 @@ pub trait ConversationItemStorage: Send + Sync + 'static { conversation_id: &ConversationId, params: ListParams, ) -> Result>; + + /// Get a single item by ID + async fn get_item(&self, item_id: &ConversationItemId) -> Result>; + + /// Check if an item is linked to a conversation + async fn is_item_linked( + &self, + conversation_id: &ConversationId, + item_id: &ConversationItemId, + ) -> Result; + + /// Delete an item link from a conversation (does not delete the item itself) + async fn delete_item( + &self, + conversation_id: &ConversationId, + item_id: &ConversationItemId, + ) -> Result<()>; } pub type SharedConversationItemStorage = Arc; /// Helper to build id prefix based on item_type pub fn make_item_id(item_type: &str) -> ConversationItemId { - // Generate a 24-byte random hex string (48 hex chars), consistent with conversation id style + // Generate exactly 50 hex characters (25 bytes) for the part after the underscore let mut rng = rand::rng(); - let mut bytes = [0u8; 24]; + let mut bytes = [0u8; 25]; rng.fill_bytes(&mut bytes); let hex_string: String = bytes.iter().map(|b| format!("{:02x}", b)).collect(); diff --git a/sgl-router/src/routers/mod.rs b/sgl-router/src/routers/mod.rs index 29d4a6c7f..58274de3f 100644 --- a/sgl-router/src/routers/mod.rs +++ b/sgl-router/src/routers/mod.rs @@ -190,6 +190,50 @@ pub trait RouterTrait: Send + Sync + Debug { .into_response() } + /// Create items in a conversation + async fn create_conversation_items( + &self, + _headers: Option<&HeaderMap>, + _conversation_id: &str, + _body: &Value, + ) -> Response { + ( + StatusCode::NOT_IMPLEMENTED, + "Conversation items create endpoint not implemented", + ) + .into_response() + } + + /// Get a single conversation item + /// The `include` parameter is accepted but not yet implemented + async fn get_conversation_item( + &self, + _headers: Option<&HeaderMap>, + _conversation_id: &str, + _item_id: &str, + _include: Option>, + ) -> Response { + ( + StatusCode::NOT_IMPLEMENTED, + "Conversation item get endpoint not implemented", + ) + .into_response() + } + + /// Delete a conversation item + async fn delete_conversation_item( + &self, + _headers: Option<&HeaderMap>, + _conversation_id: &str, + _item_id: &str, + ) -> Response { + ( + StatusCode::NOT_IMPLEMENTED, + "Conversation item delete endpoint not implemented", + ) + .into_response() + } + /// Get router type name fn router_type(&self) -> &'static str; diff --git a/sgl-router/src/routers/openai/conversations.rs b/sgl-router/src/routers/openai/conversations.rs index e98ad11ef..dfae8a15a 100644 --- a/sgl-router/src/routers/openai/conversations.rs +++ b/sgl-router/src/routers/openai/conversations.rs @@ -2,8 +2,9 @@ use crate::data_connector::{ conversation_items::ListParams, conversation_items::SortOrder, Conversation, ConversationId, - ConversationItemStorage, ConversationStorage, NewConversation, NewConversationItem, ResponseId, - ResponseStorage, SharedConversationItemStorage, SharedConversationStorage, + ConversationItemId, ConversationItemStorage, ConversationStorage, NewConversation, + NewConversationItem, ResponseId, ResponseStorage, SharedConversationItemStorage, + SharedConversationStorage, }; use crate::protocols::spec::{ResponseInput, ResponsesRequest}; use axum::http::StatusCode; @@ -315,17 +316,12 @@ pub(super) async fn list_conversation_items( let item_values: Vec = items .iter() .map(|item| { - let mut obj = serde_json::Map::new(); - obj.insert("id".to_string(), json!(item.id.0)); - obj.insert("type".to_string(), json!(item.item_type)); - obj.insert("created_at".to_string(), json!(item.created_at)); - - obj.insert("content".to_string(), item.content.clone()); - if let Some(status) = &item.status { - obj.insert("status".to_string(), json!(status)); + let mut item_json = item_to_json(item); + // Add created_at field for list view + if let Some(obj) = item_json.as_object_mut() { + obj.insert("created_at".to_string(), json!(item.created_at)); } - - Value::Object(obj) + item_json }) .collect(); @@ -352,6 +348,569 @@ pub(super) async fn list_conversation_items( } } +// ============================================================================ +// Conversation Item Operations +// ============================================================================ + +/// Supported item types for creation +/// Types marked as "implemented" are fully supported +/// Types marked as "accepted" are stored but return not-implemented warnings +const SUPPORTED_ITEM_TYPES: &[&str] = &[ + // Fully implemented types + "message", + "reasoning", + "mcp_list_tools", + "mcp_call", + "item_reference", + // Accepted but not yet implemented (stored, warning returned) + "function_tool_call", + "function_call_output", + "file_search_call", + "computer_call", + "computer_call_output", + "web_search_call", + "image_generation_call", + "code_interpreter_call", + "local_shell_call", + "local_shell_call_output", + "mcp_approval_request", + "mcp_approval_response", + "custom_tool_call", + "custom_tool_call_output", +]; + +/// Item types that are fully implemented with business logic +const IMPLEMENTED_ITEM_TYPES: &[&str] = &[ + "message", + "reasoning", + "mcp_list_tools", + "mcp_call", + "item_reference", +]; + +/// Create items in a conversation (bulk operation) +pub(super) async fn create_conversation_items( + conversation_storage: &SharedConversationStorage, + item_storage: &SharedConversationItemStorage, + conv_id: &str, + body: Value, +) -> Response { + let conversation_id = ConversationId::from(conv_id); + + // Verify conversation exists + match conversation_storage + .get_conversation(&conversation_id) + .await + { + Ok(Some(_)) => {} + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(json!({"error": "Conversation not found"})), + ) + .into_response(); + } + Err(e) => { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": format!("Failed to get conversation: {}", e)})), + ) + .into_response(); + } + } + + // Parse items array from request + let items_array = match body.get("items").and_then(|v| v.as_array()) { + Some(arr) => arr, + None => { + return ( + StatusCode::BAD_REQUEST, + Json(json!({"error": "Missing or invalid 'items' field"})), + ) + .into_response(); + } + }; + + // Validate limit (max 20 items per OpenAI spec) + if items_array.len() > 20 { + return ( + StatusCode::BAD_REQUEST, + Json(json!({"error": "Cannot add more than 20 items at a time"})), + ) + .into_response(); + } + + // Convert and create items + let mut created_items = Vec::new(); + let mut warnings = Vec::new(); + let added_at = Utc::now(); + + for item_val in items_array { + let item_type = item_val + .get("type") + .and_then(|v| v.as_str()) + .unwrap_or("message"); + + // Handle item_reference specially - link existing item instead of creating new + if item_type == "item_reference" { + let ref_id = match item_val.get("id").and_then(|v| v.as_str()) { + Some(id) => id, + None => { + return ( + StatusCode::BAD_REQUEST, + Json(json!({"error": "item_reference requires 'id' field"})), + ) + .into_response(); + } + }; + + let existing_item_id = ConversationItemId::from(ref_id); + + // Retrieve the existing item + let existing_item = match item_storage.get_item(&existing_item_id).await { + Ok(Some(item)) => item, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(json!({"error": format!("Referenced item '{}' not found", ref_id)})), + ) + .into_response(); + } + Err(e) => { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": format!("Failed to get referenced item: {}", e)})), + ) + .into_response(); + } + }; + + // Link existing item to this conversation + if let Err(e) = item_storage + .link_item(&conversation_id, &existing_item.id, added_at) + .await + { + warn!("Failed to link item {}: {}", existing_item.id.0, e); + } + + created_items.push(item_to_json(&existing_item)); + continue; + } + + // Check if user provided an ID + let user_provided_id = item_val.get("id").and_then(|v| v.as_str()); + + let item = if let Some(id_str) = user_provided_id { + // User provided an ID - check if it already exists in DB + let item_id = ConversationItemId::from(id_str); + + // First check if this item is already linked to this conversation + let is_already_linked = match item_storage + .is_item_linked(&conversation_id, &item_id) + .await + { + Ok(linked) => linked, + Err(e) => { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": format!("Failed to check item link: {}", e)})), + ) + .into_response(); + } + }; + + if is_already_linked { + // Item already linked to this conversation - return error + return ( + StatusCode::BAD_REQUEST, + Json(json!({ + "error": { + "message": "Item already in conversation", + "type": "invalid_request_error", + "param": "items", + "code": "item_already_in_conversation" + } + })), + ) + .into_response(); + } + + // Check if item exists in DB + let existing_item = match item_storage.get_item(&item_id).await { + Ok(Some(item)) => item, + Ok(None) => { + // Item doesn't exist in DB, create new one with user-provided content + let (new_item, warning) = match parse_item_from_value(item_val) { + Ok((mut item, warn)) => { + // Use the user-provided ID + item.id = Some(item_id.clone()); + (item, warn) + } + Err(e) => { + return ( + StatusCode::BAD_REQUEST, + Json(json!({"error": format!("Invalid item: {}", e)})), + ) + .into_response(); + } + }; + + // Collect warnings for not-implemented types + if let Some(w) = warning { + warnings.push(w); + } + + // Create item with provided ID + match item_storage.create_item(new_item).await { + Ok(item) => item, + Err(e) => { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": format!("Failed to create item: {}", e)})), + ) + .into_response(); + } + } + } + Err(e) => { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": format!("Failed to check item existence: {}", e)})), + ) + .into_response(); + } + }; + + existing_item + } else { + // No ID provided - parse and create new item normally + let (new_item, warning) = match parse_item_from_value(item_val) { + Ok((item, warn)) => (item, warn), + Err(e) => { + return ( + StatusCode::BAD_REQUEST, + Json(json!({"error": format!("Invalid item: {}", e)})), + ) + .into_response(); + } + }; + + // Collect warnings for not-implemented types + if let Some(w) = warning { + warnings.push(w); + } + + // Create item + match item_storage.create_item(new_item).await { + Ok(item) => item, + Err(e) => { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": format!("Failed to create item: {}", e)})), + ) + .into_response(); + } + } + }; + + // Link to conversation + if let Err(e) = item_storage + .link_item(&conversation_id, &item.id, added_at) + .await + { + warn!("Failed to link item {}: {}", item.id.0, e); + } + + created_items.push(item_to_json(&item)); + } + + // Build response matching OpenAI format + let first_id = created_items.first().and_then(|v| v.get("id")); + let last_id = created_items.last().and_then(|v| v.get("id")); + + let mut response = json!({ + "object": "list", + "data": created_items, + "first_id": first_id, + "last_id": last_id, + "has_more": false + }); + + // Add warnings if any not-implemented types were used + if !warnings.is_empty() { + if let Some(obj) = response.as_object_mut() { + obj.insert("warnings".to_string(), json!(warnings)); + } + } + + (StatusCode::OK, Json(response)).into_response() +} + +/// Get a single conversation item +/// Note: `include` query parameter is accepted but not yet implemented +pub(super) async fn get_conversation_item( + conversation_storage: &SharedConversationStorage, + item_storage: &SharedConversationItemStorage, + conv_id: &str, + item_id: &str, + _include: Option>, // Reserved for future use +) -> Response { + let conversation_id = ConversationId::from(conv_id); + let item_id = ConversationItemId::from(item_id); + + // Verify conversation exists + match conversation_storage + .get_conversation(&conversation_id) + .await + { + Ok(Some(_)) => {} + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(json!({"error": "Conversation not found"})), + ) + .into_response(); + } + Err(e) => { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": format!("Failed to get conversation: {}", e)})), + ) + .into_response(); + } + } + + // First check if the item is linked to this conversation + let is_linked = match item_storage + .is_item_linked(&conversation_id, &item_id) + .await + { + Ok(linked) => linked, + Err(e) => { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": format!("Failed to check item link: {}", e)})), + ) + .into_response(); + } + }; + + if !is_linked { + return ( + StatusCode::NOT_FOUND, + Json(json!({"error": "Item not found in this conversation"})), + ) + .into_response(); + } + + // Get the item + match item_storage.get_item(&item_id).await { + Ok(Some(item)) => { + // TODO: Process `include` parameter when implemented + // Example: include=["metadata", "timestamps"] + (StatusCode::OK, Json(item_to_json(&item))).into_response() + } + Ok(None) => ( + StatusCode::NOT_FOUND, + Json(json!({"error": "Item not found"})), + ) + .into_response(), + Err(e) => ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": format!("Failed to get item: {}", e)})), + ) + .into_response(), + } +} + +/// Delete a conversation item +pub(super) async fn delete_conversation_item( + conversation_storage: &SharedConversationStorage, + item_storage: &SharedConversationItemStorage, + conv_id: &str, + item_id: &str, +) -> Response { + let conversation_id = ConversationId::from(conv_id); + let item_id = ConversationItemId::from(item_id); + + // Verify conversation exists and get it for response + let conversation = match conversation_storage + .get_conversation(&conversation_id) + .await + { + Ok(Some(conv)) => conv, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(json!({"error": "Conversation not found"})), + ) + .into_response(); + } + Err(e) => { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": format!("Failed to get conversation: {}", e)})), + ) + .into_response(); + } + }; + + // Delete the item + match item_storage.delete_item(&conversation_id, &item_id).await { + Ok(_) => { + info!( + conversation_id = %conversation_id.0, + item_id = %item_id.0, + "Deleted conversation item" + ); + + // Return updated conversation object (per OpenAI spec) + (StatusCode::OK, Json(conversation_to_json(&conversation))).into_response() + } + Err(e) => ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": format!("Failed to delete item: {}", e)})), + ) + .into_response(), + } +} + +/// Parse NewConversationItem from Value +/// Returns (NewConversationItem, Option) +/// Supports three top-level structures: +/// 1. Input message: {"type": "message", "role": "...", "content": [...]} +/// 2. Item: {"type": "message|function_tool_call|...", ...} +/// 3. Item reference: {"type": "item_reference", "id": "..."} +fn parse_item_from_value( + item_val: &Value, +) -> Result<(NewConversationItem, Option), String> { + // Detect structure type + let item_type = item_val + .get("type") + .and_then(|v| v.as_str()) + .unwrap_or("message"); + + // Validate item type is supported + if !SUPPORTED_ITEM_TYPES.contains(&item_type) { + return Err(format!( + "Unsupported item type '{}'. Supported types: {}", + item_type, + SUPPORTED_ITEM_TYPES.join(", ") + )); + } + + // Check if type is implemented or just accepted + let warning = if !IMPLEMENTED_ITEM_TYPES.contains(&item_type) { + Some(format!( + "Item type '{}' is accepted but not yet implemented. \ + The item will be stored but may not function as expected.", + item_type + )) + } else { + None + }; + + // Parse common fields + let role = item_val + .get("role") + .and_then(|v| v.as_str()) + .map(String::from); + let status = item_val + .get("status") + .and_then(|v| v.as_str()) + .map(String::from) + .or_else(|| Some("completed".to_string())); // Default status + + // Validate message types have role + if item_type == "message" && role.is_none() { + return Err("Message items require 'role' field".to_string()); + } + + // For special types (mcp_call, function_tool_call, etc.), store the entire item_val as content + // For message types, use the content field directly + let content = if item_type == "message" || item_type == "reasoning" { + item_val.get("content").cloned().unwrap_or(json!([])) + } else { + // Store entire item for extraction later + item_val.clone() + }; + + Ok(( + NewConversationItem { + id: None, + response_id: None, + item_type: item_type.to_string(), + role, + content, + status, + }, + warning, + )) +} + +/// Convert ConversationItem to JSON response format +/// Extracts fields from content for special types (mcp_call, mcp_list_tools, etc.) +fn item_to_json(item: &crate::data_connector::conversation_items::ConversationItem) -> Value { + let mut obj = serde_json::Map::new(); + obj.insert("id".to_string(), json!(item.id.0)); + obj.insert("type".to_string(), json!(item.item_type)); + + if let Some(role) = &item.role { + obj.insert("role".to_string(), json!(role)); + } + + // Handle special item types that need field extraction from content + match item.item_type.as_str() { + "mcp_call" => { + // Extract mcp_call fields: name, arguments, output, server_label, approval_request_id, error + if let Some(content_obj) = item.content.as_object() { + if let Some(name) = content_obj.get("name") { + obj.insert("name".to_string(), name.clone()); + } + if let Some(arguments) = content_obj.get("arguments") { + obj.insert("arguments".to_string(), arguments.clone()); + } + if let Some(output) = content_obj.get("output") { + obj.insert("output".to_string(), output.clone()); + } + if let Some(server_label) = content_obj.get("server_label") { + obj.insert("server_label".to_string(), server_label.clone()); + } + if let Some(approval_request_id) = content_obj.get("approval_request_id") { + obj.insert( + "approval_request_id".to_string(), + approval_request_id.clone(), + ); + } + if let Some(error) = content_obj.get("error") { + obj.insert("error".to_string(), error.clone()); + } + } + } + "mcp_list_tools" => { + // Extract mcp_list_tools fields: tools, server_label + if let Some(content_obj) = item.content.as_object() { + if let Some(tools) = content_obj.get("tools") { + obj.insert("tools".to_string(), tools.clone()); + } + if let Some(server_label) = content_obj.get("server_label") { + obj.insert("server_label".to_string(), server_label.clone()); + } + } + } + _ => { + // For all other types (message, reasoning, etc.), keep content as-is + obj.insert("content".to_string(), item.content.clone()); + } + } + + if let Some(status) = &item.status { + obj.insert("status".to_string(), json!(status)); + } + + Value::Object(obj) +} + // ============================================================================ // Persistence Operations // ============================================================================ @@ -374,10 +933,11 @@ pub(super) async fn persist_conversation_items( .await } -/// Helper function to create and link a conversation item (two-step API) +/// Helper function to create and optionally link a conversation item +/// If conv_id is None, only creates the item without linking async fn create_and_link_item( item_storage: &Arc, - conv_id: &ConversationId, + conv_id_opt: Option<&ConversationId>, mut new_item: NewConversationItem, ) -> Result<(), String> { // Set default status if not provided @@ -391,18 +951,26 @@ async fn create_and_link_item( .await .map_err(|e| format!("Failed to create item: {}", e))?; - // Step 2: Link it to the conversation - item_storage - .link_item(conv_id, &created.id, Utc::now()) - .await - .map_err(|e| format!("Failed to link item: {}", e))?; + // Step 2: Link it to the conversation (if provided) + if let Some(conv_id) = conv_id_opt { + item_storage + .link_item(conv_id, &created.id, Utc::now()) + .await + .map_err(|e| format!("Failed to link item: {}", e))?; - info!( - conversation_id = %conv_id.0, - item_id = %created.id.0, - item_type = %created.item_type, - "Persisted conversation item and link" - ); + info!( + conversation_id = %conv_id.0, + item_id = %created.id.0, + item_type = %created.item_type, + "Persisted conversation item and link" + ); + } else { + info!( + item_id = %created.id.0, + item_type = %created.item_type, + "Persisted conversation item (no conversation link)" + ); + } Ok(()) } @@ -415,21 +983,26 @@ async fn persist_items_with_storages( response_json: &Value, original_body: &ResponsesRequest, ) -> Result<(), String> { - let conv_id = match &original_body.conversation { - Some(id) => ConversationId::from(id.as_str()), - None => return Ok(()), + // Check if conversation is provided and validate it + let conv_id_opt = match &original_body.conversation { + Some(id) => { + let conv_id = ConversationId::from(id.as_str()); + // Verify conversation exists + if conversation_storage + .get_conversation(&conv_id) + .await + .map_err(|e| format!("Failed to get conversation: {}", e))? + .is_none() + { + warn!(conversation_id = %conv_id.0, "Conversation not found, skipping item linking"); + None // Conversation doesn't exist, store items without linking + } else { + Some(conv_id) + } + } + None => None, // No conversation provided, store items without linking }; - if conversation_storage - .get_conversation(&conv_id) - .await - .map_err(|e| format!("Failed to get conversation: {}", e))? - .is_none() - { - warn!(conversation_id = %conv_id.0, "Conversation not found, skipping item persistence"); - return Ok(()); - } - let response_id_str = response_json .get("id") .and_then(|v| v.as_str()) @@ -438,60 +1011,64 @@ async fn persist_items_with_storages( let response_id_opt = Some(response_id_str.to_string()); - // Persist input items - match &original_body.input { - ResponseInput::Text(text) => { - let new_item = NewConversationItem { - id: None, // Let storage generate ID - response_id: response_id_opt.clone(), - item_type: "message".to_string(), - role: Some("user".to_string()), - content: json!([{ "type": "input_text", "text": text }]), - status: Some("completed".to_string()), - }; - create_and_link_item(&item_storage, &conv_id, new_item).await?; - } - ResponseInput::Items(items_array) => { - for input_item in items_array { - match input_item { - crate::protocols::spec::ResponseInputOutputItem::Message { - role, - content, - status, - .. - } => { - let content_v = serde_json::to_value(content) - .map_err(|e| format!("Failed to serialize content: {}", e))?; - let new_item = NewConversationItem { - id: None, - response_id: response_id_opt.clone(), - item_type: "message".to_string(), - role: Some(role.clone()), - content: content_v, - status: status.clone(), - }; - create_and_link_item(&item_storage, &conv_id, new_item).await?; - } - _ => { - // For other types (FunctionToolCall, etc.), serialize the whole item - let item_val = serde_json::to_value(input_item) - .map_err(|e| format!("Failed to serialize item: {}", e))?; - let new_item = NewConversationItem { - id: None, - response_id: response_id_opt.clone(), - item_type: "unknown".to_string(), - role: None, - content: item_val, - status: Some("completed".to_string()), - }; - create_and_link_item(&item_storage, &conv_id, new_item).await?; + // Persist input items (only if conversation is provided) + if conv_id_opt.is_some() { + match &original_body.input { + ResponseInput::Text(text) => { + let new_item = NewConversationItem { + id: None, // Let storage generate ID + response_id: response_id_opt.clone(), + item_type: "message".to_string(), + role: Some("user".to_string()), + content: json!([{ "type": "input_text", "text": text }]), + status: Some("completed".to_string()), + }; + create_and_link_item(&item_storage, conv_id_opt.as_ref(), new_item).await?; + } + ResponseInput::Items(items_array) => { + for input_item in items_array { + match input_item { + crate::protocols::spec::ResponseInputOutputItem::Message { + role, + content, + status, + .. + } => { + let content_v = serde_json::to_value(content) + .map_err(|e| format!("Failed to serialize content: {}", e))?; + let new_item = NewConversationItem { + id: None, + response_id: response_id_opt.clone(), + item_type: "message".to_string(), + role: Some(role.clone()), + content: content_v, + status: status.clone(), + }; + create_and_link_item(&item_storage, conv_id_opt.as_ref(), new_item) + .await?; + } + _ => { + // For other types (FunctionToolCall, etc.), serialize the whole item + let item_val = serde_json::to_value(input_item) + .map_err(|e| format!("Failed to serialize item: {}", e))?; + let new_item = NewConversationItem { + id: None, + response_id: response_id_opt.clone(), + item_type: "unknown".to_string(), + role: None, + content: item_val, + status: Some("completed".to_string()), + }; + create_and_link_item(&item_storage, conv_id_opt.as_ref(), new_item) + .await?; + } } } } } } - // Persist output items + // Persist output items - ALWAYS persist output items, even if no conversation if let Some(output_arr) = response_json.get("output").and_then(|v| v.as_array()) { for output_item in output_arr { if let Some(obj) = output_item.as_object() { @@ -503,34 +1080,27 @@ async fn persist_items_with_storages( let role = obj.get("role").and_then(|v| v.as_str()).map(String::from); let status = obj.get("status").and_then(|v| v.as_str()).map(String::from); + // Extract the original item ID from the response + let item_id = obj + .get("id") + .and_then(|v| v.as_str()) + .map(ConversationItemId::from); + let content = if item_type == "message" { obj.get("content").cloned().unwrap_or(json!([])) - } else if item_type == "function_call" || item_type == "function_tool_call" { - json!({ - "type": "function_call", - "name": obj.get("name"), - "call_id": obj.get("call_id").or_else(|| obj.get("id")), - "arguments": obj.get("arguments") - }) - } else if item_type == "function_call_output" { - json!({ - "type": "function_call_output", - "call_id": obj.get("call_id"), - "output": obj.get("output") - }) } else { output_item.clone() }; let new_item = NewConversationItem { - id: None, + id: item_id, // Use the original ID from response response_id: response_id_opt.clone(), item_type: item_type.to_string(), role, content, status, }; - create_and_link_item(&item_storage, &conv_id, new_item).await?; + create_and_link_item(&item_storage, conv_id_opt.as_ref(), new_item).await?; } } } @@ -543,9 +1113,13 @@ async fn persist_items_with_storages( response_storage .store_response(stored_response) .await - .map_err(|e| format!("Failed to store response in conversation: {}", e))?; + .map_err(|e| format!("Failed to store response: {}", e))?; - info!(conversation_id = %conv_id.0, response_id = %final_response_id.0, "Persisted conversation items and response"); + if let Some(conv_id) = &conv_id_opt { + info!(conversation_id = %conv_id.0, response_id = %final_response_id.0, "Persisted conversation items and response"); + } else { + info!(response_id = %final_response_id.0, "Persisted items and response (no conversation)"); + } Ok(()) } @@ -555,7 +1129,7 @@ async fn persist_items_with_storages( // ============================================================================ /// Convert conversation to JSON response -fn conversation_to_json(conversation: &Conversation) -> Value { +pub(crate) fn conversation_to_json(conversation: &Conversation) -> Value { let mut response = json!({ "id": conversation.id.0, "object": "conversation", diff --git a/sgl-router/src/routers/openai/mcp.rs b/sgl-router/src/routers/openai/mcp.rs index 3d3cdcf55..d50fab6d0 100644 --- a/sgl-router/src/routers/openai/mcp.rs +++ b/sgl-router/src/routers/openai/mcp.rs @@ -829,7 +829,8 @@ pub(super) fn build_incomplete_response( pub(super) fn generate_mcp_id(prefix: &str) -> String { use rand::RngCore; let mut rng = rand::rng(); - let mut bytes = [0u8; 30]; + // Generate exactly 50 hex characters (25 bytes) for the part after the underscore + let mut bytes = [0u8; 25]; rng.fill_bytes(&mut bytes); let hex_string: String = bytes.iter().map(|b| format!("{:02x}", b)).collect(); format!("{}_{}", prefix, hex_string) diff --git a/sgl-router/src/routers/openai/responses.rs b/sgl-router/src/routers/openai/responses.rs index 866c6d45e..58ca966b5 100644 --- a/sgl-router/src/routers/openai/responses.rs +++ b/sgl-router/src/routers/openai/responses.rs @@ -1,10 +1,10 @@ //! Response storage, patching, and extraction utilities -use crate::data_connector::{ResponseId, SharedResponseStorage, StoredResponse}; +use crate::data_connector::{ResponseId, StoredResponse}; use crate::protocols::spec::{ResponseInput, ResponseToolType, ResponsesRequest}; use serde_json::{json, Value}; use std::collections::HashMap; -use tracing::{info, warn}; +use tracing::warn; use super::utils::event_types; @@ -12,25 +12,6 @@ use super::utils::event_types; // Response Storage Operations // ============================================================================ -/// Store a response internally (checks if storage is enabled) -pub(super) async fn store_response_internal( - response_storage: &SharedResponseStorage, - response_json: &Value, - original_body: &ResponsesRequest, -) -> Result<(), String> { - if !original_body.store { - return Ok(()); - } - - match store_response_impl(response_storage, response_json, original_body).await { - Ok(response_id) => { - info!(response_id = %response_id.0, "Stored response locally"); - Ok(()) - } - Err(e) => Err(e), - } -} - /// Build a StoredResponse from response JSON and original request pub(super) fn build_stored_response( response_json: &Value, @@ -98,20 +79,6 @@ pub(super) fn build_stored_response( stored_response } -/// Store response implementation (public for use across modules) -pub(super) async fn store_response_impl( - response_storage: &SharedResponseStorage, - response_json: &Value, - original_body: &ResponsesRequest, -) -> Result { - let stored_response = build_stored_response(response_json, original_body); - - response_storage - .store_response(stored_response) - .await - .map_err(|e| format!("Failed to store response: {}", e)) -} - // ============================================================================ // Response JSON Patching // ============================================================================ diff --git a/sgl-router/src/routers/openai/router.rs b/sgl-router/src/routers/openai/router.rs index 478a7301e..11ce66fd2 100644 --- a/sgl-router/src/routers/openai/router.rs +++ b/sgl-router/src/routers/openai/router.rs @@ -31,14 +31,15 @@ use tracing::{info, warn}; // Import from sibling modules use super::conversations::{ - create_conversation, delete_conversation, get_conversation, list_conversation_items, - persist_conversation_items, update_conversation, + create_conversation, create_conversation_items, delete_conversation, delete_conversation_item, + get_conversation, get_conversation_item, list_conversation_items, persist_conversation_items, + update_conversation, }; use super::mcp::{ execute_tool_loop, mcp_manager_from_request_tools, prepare_mcp_payload_for_streaming, McpLoopConfig, }; -use super::responses::{mask_tools_as_mcp, patch_streaming_response_json, store_response_internal}; +use super::responses::{mask_tools_as_mcp, patch_streaming_response_json}; use super::streaming::handle_streaming_response; // ============================================================================ @@ -230,26 +231,17 @@ impl OpenAIRouter { original_previous_response_id.as_deref(), ); - // Persist conversation items if conversation is provided - if original_body.conversation.is_some() { - if let Err(err) = persist_conversation_items( - self.conversation_storage.clone(), - self.conversation_item_storage.clone(), - self.response_storage.clone(), - &response_json, - original_body, - ) - .await - { - warn!("Failed to persist conversation items: {}", err); - } - } else { - // Store response only if no conversation (persist_conversation_items already stores it) - if let Err(err) = - store_response_internal(&self.response_storage, &response_json, original_body).await - { - warn!("Failed to store response: {}", err); - } + // Always persist conversation items and response (even without conversation) + if let Err(err) = persist_conversation_items( + self.conversation_storage.clone(), + self.conversation_item_storage.clone(), + self.response_storage.clone(), + &response_json, + original_body, + ) + .await + { + warn!("Failed to persist conversation items: {}", err); } (StatusCode::OK, Json(response_json)).into_response() @@ -906,4 +898,51 @@ impl crate::routers::RouterTrait for OpenAIRouter { ) .await } + + async fn create_conversation_items( + &self, + _headers: Option<&HeaderMap>, + conversation_id: &str, + body: &Value, + ) -> Response { + create_conversation_items( + &self.conversation_storage, + &self.conversation_item_storage, + conversation_id, + body.clone(), + ) + .await + } + + async fn get_conversation_item( + &self, + _headers: Option<&HeaderMap>, + conversation_id: &str, + item_id: &str, + include: Option>, + ) -> Response { + get_conversation_item( + &self.conversation_storage, + &self.conversation_item_storage, + conversation_id, + item_id, + include, + ) + .await + } + + async fn delete_conversation_item( + &self, + _headers: Option<&HeaderMap>, + conversation_id: &str, + item_id: &str, + ) -> Response { + delete_conversation_item( + &self.conversation_storage, + &self.conversation_item_storage, + conversation_id, + item_id, + ) + .await + } } diff --git a/sgl-router/src/routers/openai/streaming.rs b/sgl-router/src/routers/openai/streaming.rs index a60c57949..b643840d6 100644 --- a/sgl-router/src/routers/openai/streaming.rs +++ b/sgl-router/src/routers/openai/streaming.rs @@ -32,9 +32,7 @@ use super::mcp::{ mcp_manager_from_request_tools, prepare_mcp_payload_for_streaming, send_mcp_list_tools_events, McpLoopConfig, ToolLoopState, }; -use super::responses::{ - mask_tools_as_mcp, patch_streaming_response_json, rewrite_streaming_block, store_response_impl, -}; +use super::responses::{mask_tools_as_mcp, patch_streaming_response_json, rewrite_streaming_block}; use super::utils::{event_types, FunctionCallInProgress, OutputIndexMapper, StreamAction}; // ============================================================================ @@ -1082,26 +1080,17 @@ pub(super) async fn handle_simple_streaming_passthrough( previous_response_id.as_deref(), ); - if persist_needed { - if let Err(err) = persist_conversation_items( - conversation_storage.clone(), - conversation_item_storage.clone(), - response_storage.clone(), - &response_json, - &original_request, - ) - .await - { - warn!("Failed to persist conversation items (stream): {}", err); - } - } else if should_store { - // Store response only if no conversation (persist_conversation_items already stores it) - if let Err(err) = - store_response_impl(&response_storage, &response_json, &original_request) - .await - { - warn!("Failed to store streaming response: {}", err); - } + // Always persist conversation items and response (even without conversation) + if let Err(err) = persist_conversation_items( + conversation_storage.clone(), + conversation_item_storage.clone(), + response_storage.clone(), + &response_json, + &original_request, + ) + .await + { + warn!("Failed to persist conversation items (stream): {}", err); } } else if let Some(error_payload) = encountered_error { warn!("Upstream streaming error payload: {}", error_payload); @@ -1390,32 +1379,20 @@ pub(super) async fn handle_streaming_with_tool_interception( previous_response_id.as_deref(), ); - if persist_needed { - if let Err(err) = persist_conversation_items( - conversation_storage.clone(), - conversation_item_storage.clone(), - response_storage.clone(), - &response_json, - &original_request, - ) - .await - { - warn!( - "Failed to persist conversation items (stream + MCP): {}", - err - ); - } - } else if should_store { - // Store response only if no conversation (persist_conversation_items already stores it) - if let Err(err) = store_response_impl( - &response_storage, - &response_json, - &original_request, - ) - .await - { - warn!("Failed to store streaming response: {}", err); - } + // Always persist conversation items and response (even without conversation) + if let Err(err) = persist_conversation_items( + conversation_storage.clone(), + conversation_item_storage.clone(), + response_storage.clone(), + &response_json, + &original_request, + ) + .await + { + warn!( + "Failed to persist conversation items (stream + MCP): {}", + err + ); } } diff --git a/sgl-router/src/routers/router_manager.rs b/sgl-router/src/routers/router_manager.rs index 063aa660e..740354f4f 100644 --- a/sgl-router/src/routers/router_manager.rs +++ b/sgl-router/src/routers/router_manager.rs @@ -614,6 +614,76 @@ impl RouterTrait for RouterManager { .into_response() } } + + async fn create_conversation_items( + &self, + headers: Option<&HeaderMap>, + conversation_id: &str, + body: &Value, + ) -> Response { + let router = self.select_router_for_request(headers, None); + if let Some(router) = router { + router + .create_conversation_items(headers, conversation_id, body) + .await + } else { + ( + StatusCode::NOT_FOUND, + format!( + "No router available to create conversation items for '{}'", + conversation_id + ), + ) + .into_response() + } + } + + async fn get_conversation_item( + &self, + headers: Option<&HeaderMap>, + conversation_id: &str, + item_id: &str, + include: Option>, + ) -> Response { + let router = self.select_router_for_request(headers, None); + if let Some(router) = router { + router + .get_conversation_item(headers, conversation_id, item_id, include) + .await + } else { + ( + StatusCode::NOT_FOUND, + format!( + "No router available to get conversation item '{}' in '{}'", + item_id, conversation_id + ), + ) + .into_response() + } + } + + async fn delete_conversation_item( + &self, + headers: Option<&HeaderMap>, + conversation_id: &str, + item_id: &str, + ) -> Response { + let router = self.select_router_for_request(headers, None); + if let Some(router) = router { + router + .delete_conversation_item(headers, conversation_id, item_id) + .await + } else { + ( + StatusCode::NOT_FOUND, + format!( + "No router available to delete conversation item '{}' in '{}'", + item_id, conversation_id + ), + ) + .into_response() + } + } } impl std::fmt::Debug for RouterManager { diff --git a/sgl-router/src/server.rs b/sgl-router/src/server.rs index ad222f4fd..74d4158af 100644 --- a/sgl-router/src/server.rs +++ b/sgl-router/src/server.rs @@ -440,6 +440,47 @@ async fn v1_conversations_list_items( .await } +#[derive(Deserialize, Default)] +struct GetItemQuery { + /// Additional fields to include in response (not yet implemented) + include: Option>, +} + +async fn v1_conversations_create_items( + State(state): State>, + Path(conversation_id): Path, + headers: http::HeaderMap, + Json(body): Json, +) -> Response { + state + .router + .create_conversation_items(Some(&headers), &conversation_id, &body) + .await +} + +async fn v1_conversations_get_item( + State(state): State>, + Path((conversation_id, item_id)): Path<(String, String)>, + Query(query): Query, + headers: http::HeaderMap, +) -> Response { + state + .router + .get_conversation_item(Some(&headers), &conversation_id, &item_id, query.include) + .await +} + +async fn v1_conversations_delete_item( + State(state): State>, + Path((conversation_id, item_id)): Path<(String, String)>, + headers: http::HeaderMap, +) -> Response { + state + .router + .delete_conversation_item(Some(&headers), &conversation_id, &item_id) + .await +} + #[derive(Deserialize)] struct AddWorkerQuery { url: String, @@ -716,7 +757,11 @@ pub fn build_app( ) .route( "/v1/conversations/{conversation_id}/items", - get(v1_conversations_list_items), + get(v1_conversations_list_items).post(v1_conversations_create_items), + ) + .route( + "/v1/conversations/{conversation_id}/items/{item_id}", + get(v1_conversations_get_item).delete(v1_conversations_delete_item), ) .route_layer(axum::middleware::from_fn_with_state( app_state.clone(), diff --git a/sgl-router/tests/responses_api_test.rs b/sgl-router/tests/responses_api_test.rs index 1019a6846..4e640c1b3 100644 --- a/sgl-router/tests/responses_api_test.rs +++ b/sgl-router/tests/responses_api_test.rs @@ -1333,3 +1333,519 @@ async fn test_streaming_multi_turn_with_mcp() { worker.stop().await; mcp.stop().await; } + +#[tokio::test] +async fn test_conversation_items_create_and_get() { + // Test creating items and getting a specific item + let router_cfg = RouterConfig { + mode: RoutingMode::OpenAI { + worker_urls: vec!["http://localhost".to_string()], + }, + connection_mode: ConnectionMode::Http, + policy: PolicyConfig::Random, + host: "127.0.0.1".to_string(), + port: 0, + max_payload_size: 8 * 1024 * 1024, + request_timeout_secs: 60, + worker_startup_timeout_secs: 1, + worker_startup_check_interval_secs: 1, + dp_aware: false, + api_key: None, + discovery: None, + metrics: None, + log_dir: None, + log_level: Some("warn".to_string()), + request_id_headers: None, + max_concurrent_requests: 8, + queue_size: 0, + queue_timeout_secs: 5, + rate_limit_tokens_per_second: None, + cors_allowed_origins: vec![], + retry: RetryConfig::default(), + circuit_breaker: CircuitBreakerConfig::default(), + disable_retries: false, + disable_circuit_breaker: false, + health_check: HealthCheckConfig::default(), + enable_igw: false, + model_path: None, + tokenizer_path: None, + history_backend: sglang_router_rs::config::HistoryBackend::Memory, + oracle: None, + reasoning_parser: None, + tool_call_parser: None, + }; + + let ctx = AppContext::new(router_cfg, reqwest::Client::new(), 8, None).expect("ctx"); + let router = RouterFactory::create_router(&Arc::new(ctx)) + .await + .expect("router"); + + // Create conversation + let create_conv = serde_json::json!({}); + let conv_resp = router.create_conversation(None, &create_conv).await; + assert_eq!(conv_resp.status(), StatusCode::OK); + let conv_bytes = axum::body::to_bytes(conv_resp.into_body(), usize::MAX) + .await + .unwrap(); + let conv_json: serde_json::Value = serde_json::from_slice(&conv_bytes).unwrap(); + let conv_id = conv_json["id"].as_str().unwrap(); + + // Create items + let create_items = serde_json::json!({ + "items": [ + { + "type": "message", + "role": "user", + "content": [{"type": "input_text", "text": "Hello"}] + }, + { + "type": "message", + "role": "assistant", + "content": [{"type": "output_text", "text": "Hi there!"}] + } + ] + }); + + let items_resp = router + .create_conversation_items(None, conv_id, &create_items) + .await; + assert_eq!(items_resp.status(), StatusCode::OK); + let items_bytes = axum::body::to_bytes(items_resp.into_body(), usize::MAX) + .await + .unwrap(); + let items_json: serde_json::Value = serde_json::from_slice(&items_bytes).unwrap(); + + // Verify response structure + assert_eq!(items_json["object"], "list"); + assert!(items_json["data"].is_array()); + + // Get first item + let item_id = items_json["data"][0]["id"].as_str().unwrap(); + let get_resp = router + .get_conversation_item(None, conv_id, item_id, None) + .await; + assert_eq!(get_resp.status(), StatusCode::OK); + let get_bytes = axum::body::to_bytes(get_resp.into_body(), usize::MAX) + .await + .unwrap(); + let get_json: serde_json::Value = serde_json::from_slice(&get_bytes).unwrap(); + + // Verify item structure + assert_eq!(get_json["id"], item_id); + assert_eq!(get_json["type"], "message"); + assert_eq!(get_json["role"], "user"); +} + +#[tokio::test] +async fn test_conversation_items_delete() { + // Test deleting an item from a conversation + let router_cfg = RouterConfig { + mode: RoutingMode::OpenAI { + worker_urls: vec!["http://localhost".to_string()], + }, + connection_mode: ConnectionMode::Http, + policy: PolicyConfig::Random, + host: "127.0.0.1".to_string(), + port: 0, + max_payload_size: 8 * 1024 * 1024, + request_timeout_secs: 60, + worker_startup_timeout_secs: 1, + worker_startup_check_interval_secs: 1, + dp_aware: false, + api_key: None, + discovery: None, + metrics: None, + log_dir: None, + log_level: Some("warn".to_string()), + request_id_headers: None, + max_concurrent_requests: 8, + queue_size: 0, + queue_timeout_secs: 5, + rate_limit_tokens_per_second: None, + cors_allowed_origins: vec![], + retry: RetryConfig::default(), + circuit_breaker: CircuitBreakerConfig::default(), + disable_retries: false, + disable_circuit_breaker: false, + health_check: HealthCheckConfig::default(), + enable_igw: false, + model_path: None, + tokenizer_path: None, + history_backend: sglang_router_rs::config::HistoryBackend::Memory, + oracle: None, + reasoning_parser: None, + tool_call_parser: None, + }; + + let ctx = AppContext::new(router_cfg, reqwest::Client::new(), 8, None).expect("ctx"); + let router = RouterFactory::create_router(&Arc::new(ctx)) + .await + .expect("router"); + + // Create conversation + let create_conv = serde_json::json!({}); + let conv_resp = router.create_conversation(None, &create_conv).await; + let conv_bytes = axum::body::to_bytes(conv_resp.into_body(), usize::MAX) + .await + .unwrap(); + let conv_json: serde_json::Value = serde_json::from_slice(&conv_bytes).unwrap(); + let conv_id = conv_json["id"].as_str().unwrap(); + + // Create item + let create_items = serde_json::json!({ + "items": [ + { + "type": "message", + "role": "user", + "content": [{"type": "input_text", "text": "Test"}] + } + ] + }); + + let items_resp = router + .create_conversation_items(None, conv_id, &create_items) + .await; + let items_bytes = axum::body::to_bytes(items_resp.into_body(), usize::MAX) + .await + .unwrap(); + let items_json: serde_json::Value = serde_json::from_slice(&items_bytes).unwrap(); + let item_id = items_json["data"][0]["id"].as_str().unwrap(); + + // List items (should have 1) + let list_resp = router + .list_conversation_items(None, conv_id, None, None, None) + .await; + let list_bytes = axum::body::to_bytes(list_resp.into_body(), usize::MAX) + .await + .unwrap(); + let list_json: serde_json::Value = serde_json::from_slice(&list_bytes).unwrap(); + assert_eq!(list_json["data"].as_array().unwrap().len(), 1); + + // Delete item + let del_resp = router + .delete_conversation_item(None, conv_id, item_id) + .await; + assert_eq!(del_resp.status(), StatusCode::OK); + + // List items again (should have 0) + let list_resp2 = router + .list_conversation_items(None, conv_id, None, None, None) + .await; + let list_bytes2 = axum::body::to_bytes(list_resp2.into_body(), usize::MAX) + .await + .unwrap(); + let list_json2: serde_json::Value = serde_json::from_slice(&list_bytes2).unwrap(); + assert_eq!(list_json2["data"].as_array().unwrap().len(), 0); + + // Item should NOT be gettable from this conversation after deletion (link removed) + let get_resp = router + .get_conversation_item(None, conv_id, item_id, None) + .await; + assert_eq!(get_resp.status(), StatusCode::NOT_FOUND); +} + +#[tokio::test] +async fn test_conversation_items_max_limit() { + // Test that creating > 20 items returns error + let router_cfg = RouterConfig { + mode: RoutingMode::OpenAI { + worker_urls: vec!["http://localhost".to_string()], + }, + connection_mode: ConnectionMode::Http, + policy: PolicyConfig::Random, + host: "127.0.0.1".to_string(), + port: 0, + max_payload_size: 8 * 1024 * 1024, + request_timeout_secs: 60, + worker_startup_timeout_secs: 1, + worker_startup_check_interval_secs: 1, + dp_aware: false, + api_key: None, + discovery: None, + metrics: None, + log_dir: None, + log_level: Some("warn".to_string()), + request_id_headers: None, + max_concurrent_requests: 8, + queue_size: 0, + queue_timeout_secs: 5, + rate_limit_tokens_per_second: None, + cors_allowed_origins: vec![], + retry: RetryConfig::default(), + circuit_breaker: CircuitBreakerConfig::default(), + disable_retries: false, + disable_circuit_breaker: false, + health_check: HealthCheckConfig::default(), + enable_igw: false, + model_path: None, + tokenizer_path: None, + history_backend: sglang_router_rs::config::HistoryBackend::Memory, + oracle: None, + reasoning_parser: None, + tool_call_parser: None, + }; + + let ctx = AppContext::new(router_cfg, reqwest::Client::new(), 8, None).expect("ctx"); + let router = RouterFactory::create_router(&Arc::new(ctx)) + .await + .expect("router"); + + // Create conversation + let create_conv = serde_json::json!({}); + let conv_resp = router.create_conversation(None, &create_conv).await; + let conv_bytes = axum::body::to_bytes(conv_resp.into_body(), usize::MAX) + .await + .unwrap(); + let conv_json: serde_json::Value = serde_json::from_slice(&conv_bytes).unwrap(); + let conv_id = conv_json["id"].as_str().unwrap(); + + // Try to create 21 items (over limit) + let mut items = Vec::new(); + for i in 0..21 { + items.push(serde_json::json!({ + "type": "message", + "role": "user", + "content": [{"type": "input_text", "text": format!("Message {}", i)}] + })); + } + let create_items = serde_json::json!({"items": items}); + + let items_resp = router + .create_conversation_items(None, conv_id, &create_items) + .await; + assert_eq!(items_resp.status(), StatusCode::BAD_REQUEST); + + let items_bytes = axum::body::to_bytes(items_resp.into_body(), usize::MAX) + .await + .unwrap(); + let items_text = String::from_utf8_lossy(&items_bytes); + assert!(items_text.contains("Cannot add more than 20 items")); +} + +#[tokio::test] +async fn test_conversation_items_unsupported_type() { + // Test that unsupported item types return error + let router_cfg = RouterConfig { + mode: RoutingMode::OpenAI { + worker_urls: vec!["http://localhost".to_string()], + }, + connection_mode: ConnectionMode::Http, + policy: PolicyConfig::Random, + host: "127.0.0.1".to_string(), + port: 0, + max_payload_size: 8 * 1024 * 1024, + request_timeout_secs: 60, + worker_startup_timeout_secs: 1, + worker_startup_check_interval_secs: 1, + dp_aware: false, + api_key: None, + discovery: None, + metrics: None, + log_dir: None, + log_level: Some("warn".to_string()), + request_id_headers: None, + max_concurrent_requests: 8, + queue_size: 0, + queue_timeout_secs: 5, + rate_limit_tokens_per_second: None, + cors_allowed_origins: vec![], + retry: RetryConfig::default(), + circuit_breaker: CircuitBreakerConfig::default(), + disable_retries: false, + disable_circuit_breaker: false, + health_check: HealthCheckConfig::default(), + enable_igw: false, + model_path: None, + tokenizer_path: None, + history_backend: sglang_router_rs::config::HistoryBackend::Memory, + oracle: None, + reasoning_parser: None, + tool_call_parser: None, + }; + + let ctx = AppContext::new(router_cfg, reqwest::Client::new(), 8, None).expect("ctx"); + let router = RouterFactory::create_router(&Arc::new(ctx)) + .await + .expect("router"); + + // Create conversation + let create_conv = serde_json::json!({}); + let conv_resp = router.create_conversation(None, &create_conv).await; + let conv_bytes = axum::body::to_bytes(conv_resp.into_body(), usize::MAX) + .await + .unwrap(); + let conv_json: serde_json::Value = serde_json::from_slice(&conv_bytes).unwrap(); + let conv_id = conv_json["id"].as_str().unwrap(); + + // Try to create item with completely unsupported type + let create_items = serde_json::json!({ + "items": [ + { + "type": "totally_invalid_type", + "content": [] + } + ] + }); + + let items_resp = router + .create_conversation_items(None, conv_id, &create_items) + .await; + assert_eq!(items_resp.status(), StatusCode::BAD_REQUEST); + + let items_bytes = axum::body::to_bytes(items_resp.into_body(), usize::MAX) + .await + .unwrap(); + let items_text = String::from_utf8_lossy(&items_bytes); + assert!(items_text.contains("Unsupported item type")); +} + +#[tokio::test] +async fn test_conversation_items_multi_conversation_sharing() { + // Test that items can be shared across conversations via soft delete + let router_cfg = RouterConfig { + mode: RoutingMode::OpenAI { + worker_urls: vec!["http://localhost".to_string()], + }, + connection_mode: ConnectionMode::Http, + policy: PolicyConfig::Random, + host: "127.0.0.1".to_string(), + port: 0, + max_payload_size: 8 * 1024 * 1024, + request_timeout_secs: 60, + worker_startup_timeout_secs: 1, + worker_startup_check_interval_secs: 1, + dp_aware: false, + api_key: None, + discovery: None, + metrics: None, + log_dir: None, + log_level: Some("warn".to_string()), + request_id_headers: None, + max_concurrent_requests: 8, + queue_size: 0, + queue_timeout_secs: 5, + rate_limit_tokens_per_second: None, + cors_allowed_origins: vec![], + retry: RetryConfig::default(), + circuit_breaker: CircuitBreakerConfig::default(), + disable_retries: false, + disable_circuit_breaker: false, + health_check: HealthCheckConfig::default(), + enable_igw: false, + model_path: None, + tokenizer_path: None, + history_backend: sglang_router_rs::config::HistoryBackend::Memory, + oracle: None, + reasoning_parser: None, + tool_call_parser: None, + }; + + let ctx = AppContext::new(router_cfg, reqwest::Client::new(), 8, None).expect("ctx"); + let router = RouterFactory::create_router(&Arc::new(ctx)) + .await + .expect("router"); + + // Create two conversations + let conv_a_resp = router + .create_conversation(None, &serde_json::json!({})) + .await; + let conv_a_bytes = axum::body::to_bytes(conv_a_resp.into_body(), usize::MAX) + .await + .unwrap(); + let conv_a_json: serde_json::Value = serde_json::from_slice(&conv_a_bytes).unwrap(); + let conv_a_id = conv_a_json["id"].as_str().unwrap(); + + let conv_b_resp = router + .create_conversation(None, &serde_json::json!({})) + .await; + let conv_b_bytes = axum::body::to_bytes(conv_b_resp.into_body(), usize::MAX) + .await + .unwrap(); + let conv_b_json: serde_json::Value = serde_json::from_slice(&conv_b_bytes).unwrap(); + let conv_b_id = conv_b_json["id"].as_str().unwrap(); + + // Create item in conversation A + let create_items = serde_json::json!({ + "items": [ + { + "type": "message", + "role": "user", + "content": [{"type": "input_text", "text": "Shared message"}] + } + ] + }); + + let items_a_resp = router + .create_conversation_items(None, conv_a_id, &create_items) + .await; + let items_a_bytes = axum::body::to_bytes(items_a_resp.into_body(), usize::MAX) + .await + .unwrap(); + let items_a_json: serde_json::Value = serde_json::from_slice(&items_a_bytes).unwrap(); + let item_id = items_a_json["data"][0]["id"].as_str().unwrap(); + + // Reference the same item in conversation B + let reference_items = serde_json::json!({ + "items": [ + { + "type": "item_reference", + "id": item_id + } + ] + }); + + let items_b_resp = router + .create_conversation_items(None, conv_b_id, &reference_items) + .await; + assert_eq!(items_b_resp.status(), StatusCode::OK); + + // Verify item appears in both conversations + let list_a = router + .list_conversation_items(None, conv_a_id, None, None, None) + .await; + let list_a_bytes = axum::body::to_bytes(list_a.into_body(), usize::MAX) + .await + .unwrap(); + let list_a_json: serde_json::Value = serde_json::from_slice(&list_a_bytes).unwrap(); + assert_eq!(list_a_json["data"].as_array().unwrap().len(), 1); + + let list_b = router + .list_conversation_items(None, conv_b_id, None, None, None) + .await; + let list_b_bytes = axum::body::to_bytes(list_b.into_body(), usize::MAX) + .await + .unwrap(); + let list_b_json: serde_json::Value = serde_json::from_slice(&list_b_bytes).unwrap(); + assert_eq!(list_b_json["data"].as_array().unwrap().len(), 1); + + // Delete from conversation A + router + .delete_conversation_item(None, conv_a_id, item_id) + .await; + + // Should be removed from A + let list_a2 = router + .list_conversation_items(None, conv_a_id, None, None, None) + .await; + let list_a2_bytes = axum::body::to_bytes(list_a2.into_body(), usize::MAX) + .await + .unwrap(); + let list_a2_json: serde_json::Value = serde_json::from_slice(&list_a2_bytes).unwrap(); + assert_eq!(list_a2_json["data"].as_array().unwrap().len(), 0); + + // Should still exist in B (soft delete) + let list_b2 = router + .list_conversation_items(None, conv_b_id, None, None, None) + .await; + let list_b2_bytes = axum::body::to_bytes(list_b2.into_body(), usize::MAX) + .await + .unwrap(); + let list_b2_json: serde_json::Value = serde_json::from_slice(&list_b2_bytes).unwrap(); + assert_eq!(list_b2_json["data"].as_array().unwrap().len(), 1); + + // Item should still be directly gettable + let get_resp = router + .get_conversation_item(None, conv_b_id, item_id, None) + .await; + assert_eq!(get_resp.status(), StatusCode::OK); +}