From 07c9d8fba2dede6903898d7a24870e97ad0ce050 Mon Sep 17 00:00:00 2001 From: Simo Lin Date: Thu, 28 Aug 2025 05:57:13 -0700 Subject: [PATCH] [router] add llama3.2 multi json streaming parser (#9735) --- .../src/tool_parser/parsers/json_parser.rs | 100 ++++++++++++++++-- sgl-router/tests/tool_parser_llama.rs | 76 ++++++++++++- 2 files changed, 164 insertions(+), 12 deletions(-) diff --git a/sgl-router/src/tool_parser/parsers/json_parser.rs b/sgl-router/src/tool_parser/parsers/json_parser.rs index 104383582..b8430dc9e 100644 --- a/sgl-router/src/tool_parser/parsers/json_parser.rs +++ b/sgl-router/src/tool_parser/parsers/json_parser.rs @@ -356,6 +356,81 @@ impl ToolParser for JsonParser { return Ok(StreamResult::Incomplete); } + // Extract JSON content first to check for separators + let extracted_json = self.extract_json_content(&state.buffer); + + // Handle multiple JSON objects with separators + // Check if we have a separator and potentially multiple JSON objects + let separator = &self.token_config.separator; + if !separator.is_empty() && extracted_json.contains(separator.as_str()) { + // Try to find a complete JSON object before the separator + if let Some(separator_pos) = extracted_json.find(separator.as_str()) { + // Get JSON before separator + let before_separator = &extracted_json[..separator_pos]; + + // Try to parse the JSON before the separator + match serde_json::from_str::(before_separator) { + Ok(value) => { + // Parse tool calls from this JSON + let tools = self.parse_json_value(&value)?; + if !tools.is_empty() { + // We need to figure out how much to remove from the original buffer + // Find where the separator is in the original buffer and remove up to and including it + if let Some(sep_in_original) = state.buffer.find(separator.as_str()) { + let remaining = + state.buffer[sep_in_original + separator.len()..].to_string(); + state.buffer = remaining; + } + + // Return the first tool as complete + if let Some(tool) = tools.into_iter().next() { + return Ok(StreamResult::ToolComplete(tool)); + } + } + } + Err(_) => { + // Failed to parse, continue to try other methods + } + } + } + } + + // Handle multiple start tokens (e.g., multiple <|python_tag|> markers) + if !self.token_config.start_tokens.is_empty() { + let start_token = &self.token_config.start_tokens[0]; + if !start_token.is_empty() { + // Find all occurrences of start token + let occurrences: Vec<_> = + state.buffer.match_indices(start_token.as_str()).collect(); + if occurrences.len() > 1 { + // We have multiple start tokens, try to process the first complete one + let first_pos = occurrences[0].0; + let second_pos = occurrences[1].0; + + // Extract content between first and second start token + let first_json_section = &state.buffer[first_pos..second_pos]; + let json_content = self.extract_json_content(first_json_section); + + // Try to parse this as complete JSON + if let Ok(value) = serde_json::from_str::(json_content) { + // Parse tool calls from this JSON + let tools = self.parse_json_value(&value)?; + if !tools.is_empty() { + // Remove the processed section from buffer + let remaining = state.buffer[second_pos..].to_string(); + state.buffer = remaining; + + // Return the first tool as complete + if let Some(tool) = tools.into_iter().next() { + return Ok(StreamResult::ToolComplete(tool)); + } + } + } + } + } + } + + // Regular single JSON parsing // Extract JSON content let json_content = self.extract_json_content(&state.buffer); @@ -364,16 +439,23 @@ impl ToolParser for JsonParser { Ok((value, consumed)) => { // Check if we have a complete JSON structure if consumed == json_content.len() { - // Complete JSON, parse tool calls - let tools = self.parse_json_value(&value)?; - if !tools.is_empty() { - // Clear buffer since we consumed everything - state.buffer.clear(); + // Check if this is truly complete or just has null from incomplete parsing + // We need to ensure the JSON actually ends properly (not cut off mid-key) + let trimmed = json_content.trim(); + let looks_complete = trimmed.ends_with('}') || trimmed.ends_with(']'); - // Return the first tool as complete - // TODO simplified version, address more complex version - if let Some(tool) = tools.into_iter().next() { - return Ok(StreamResult::ToolComplete(tool)); + if looks_complete { + // Complete JSON, parse tool calls + let tools = self.parse_json_value(&value)?; + if !tools.is_empty() { + // Clear buffer since we consumed everything + state.buffer.clear(); + + // Return the first tool as complete + // TODO simplified version, address more complex version + if let Some(tool) = tools.into_iter().next() { + return Ok(StreamResult::ToolComplete(tool)); + } } } } else { diff --git a/sgl-router/tests/tool_parser_llama.rs b/sgl-router/tests/tool_parser_llama.rs index 6222150ad..4f86ef2b2 100644 --- a/sgl-router/tests/tool_parser_llama.rs +++ b/sgl-router/tests/tool_parser_llama.rs @@ -341,14 +341,84 @@ async fn test_llama_streaming_multiple_tools() { let result = parser.parse_incremental(text, &mut state).await.unwrap(); - // Current implementation may handle this differently + // Should get first tool complete match result { sglang_router_rs::tool_parser::StreamResult::ToolComplete(tool) => { - // At minimum should get first tool assert_eq!(tool.function.name, "func1"); } + _ => panic!("Expected first tool to be complete"), + } + + // Process remaining buffer to get second tool + let result2 = parser.parse_incremental("", &mut state).await.unwrap(); + match result2 { + sglang_router_rs::tool_parser::StreamResult::ToolComplete(tool) => { + assert_eq!(tool.function.name, "func2"); + } + _ => panic!("Expected second tool to be complete"), + } +} + +#[tokio::test] +async fn test_llama_streaming_multiple_tools_chunked() { + // Test streaming multiple tool calls arriving in chunks + let parser = LlamaParser::new(); + let mut state = sglang_router_rs::tool_parser::ParseState::new(); + + // First chunk - incomplete first JSON + let chunk1 = r#"<|python_tag|>{"name": "get_weather", "arguments""#; + let result1 = parser.parse_incremental(chunk1, &mut state).await.unwrap(); + + // Should be incomplete or have tool name + match result1 { + sglang_router_rs::tool_parser::StreamResult::Incomplete + | sglang_router_rs::tool_parser::StreamResult::ToolName { .. } + | sglang_router_rs::tool_parser::StreamResult::ToolArguments { .. } => { + // Expected - could get tool name or be incomplete or even partial args + } + _ => panic!( + "Expected incomplete or tool name for partial JSON, got: {:?}", + result1 + ), + } + + // Second chunk - complete first JSON and separator + let chunk2 = r#": {"city": "Paris"}};{"name": "#; + let result2 = parser.parse_incremental(chunk2, &mut state).await.unwrap(); + + // Should get first tool complete + match result2 { + sglang_router_rs::tool_parser::StreamResult::ToolComplete(tool) => { + assert_eq!(tool.function.name, "get_weather"); + let args: serde_json::Value = serde_json::from_str(&tool.function.arguments).unwrap(); + assert_eq!(args["city"], "Paris"); + } + _ => panic!("Expected first tool to be complete after separator"), + } + + // Third chunk - complete second JSON + let chunk3 = r#""get_time", "arguments": {"timezone": "UTC"}}"#; + let result3 = parser.parse_incremental(chunk3, &mut state).await.unwrap(); + + // Should get second tool complete + match result3 { + sglang_router_rs::tool_parser::StreamResult::ToolComplete(tool) => { + assert_eq!(tool.function.name, "get_time"); + let args: serde_json::Value = serde_json::from_str(&tool.function.arguments).unwrap(); + assert_eq!(args["timezone"], "UTC"); + } _ => { - // Also acceptable if waiting for more + // If not complete yet, try one more empty chunk + let result4 = parser.parse_incremental("", &mut state).await.unwrap(); + match result4 { + sglang_router_rs::tool_parser::StreamResult::ToolComplete(tool) => { + assert_eq!(tool.function.name, "get_time"); + let args: serde_json::Value = + serde_json::from_str(&tool.function.arguments).unwrap(); + assert_eq!(args["timezone"], "UTC"); + } + _ => panic!("Expected second tool to be complete"), + } } } }