220 lines
8.1 KiB
Python
220 lines
8.1 KiB
Python
import json
|
|
import logging
|
|
import re
|
|
from typing import List, Optional
|
|
|
|
from sglang.srt.entrypoints.openai.protocol import Tool
|
|
from sglang.srt.function_call.base_format_detector import BaseFormatDetector
|
|
from sglang.srt.function_call.core_types import (
|
|
StreamingParseResult,
|
|
ToolCallItem,
|
|
_GetInfoFunc,
|
|
)
|
|
from sglang.srt.parser.harmony_parser import HarmonyParser
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class GptOssDetector(BaseFormatDetector):
|
|
"""
|
|
Detector for T4-style function calls using HarmonyParser.
|
|
|
|
Handles tool calls in the format:
|
|
<|channel|>commentary to={namespace.function}<|constrain|>json<|message|>{args}<|call|>
|
|
"""
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.harmony_parser = HarmonyParser()
|
|
self.bot_token = "<|start|>assistant<|channel|>commentary"
|
|
self.eot_token = "<|call|>"
|
|
|
|
# Pattern to extract function name and JSON from tool_call event content
|
|
self.tool_extract_pattern = re.compile(
|
|
r"to=([a-zA-Z_][a-zA-Z0-9_.]*)\s*<\|constrain\|>json<\|message\|>(.*?)(?:<\|call\|>|$)",
|
|
re.DOTALL,
|
|
)
|
|
|
|
def has_tool_call(self, text: str) -> bool:
|
|
"""Check if text contains TypeScript-style function call markers."""
|
|
return self.bot_token in text
|
|
|
|
def detect_and_parse(self, text: str, tools: List[Tool]) -> StreamingParseResult:
|
|
"""Parse TypeScript-style function calls from complete text."""
|
|
if not self.has_tool_call(text):
|
|
return StreamingParseResult(normal_text=text, calls=[])
|
|
|
|
# Parse with HarmonyParser
|
|
events = self.harmony_parser.parse(text)
|
|
# Flush buffer for complete parsing
|
|
events += self.harmony_parser.parse("")
|
|
|
|
tool_indices = self._get_tool_indices(tools)
|
|
calls = []
|
|
normal_parts = []
|
|
tool_index = 0
|
|
|
|
for event in events:
|
|
if event.event_type == "tool_call":
|
|
# Extract tool call from event content
|
|
tool_call = self._extract_tool_call_from_event(
|
|
event.raw_text if event.raw_text else event.content,
|
|
tool_indices,
|
|
tool_index,
|
|
)
|
|
if tool_call:
|
|
calls.append(tool_call)
|
|
tool_index += 1
|
|
elif event.event_type == "normal":
|
|
normal_parts.append(event.content)
|
|
# Ignore reasoning events in function call context
|
|
|
|
normal_text = " ".join(normal_parts).strip()
|
|
return StreamingParseResult(normal_text=normal_text, calls=calls)
|
|
|
|
def parse_streaming_increment(
|
|
self, new_text: str, tools: List[Tool]
|
|
) -> StreamingParseResult:
|
|
"""Parse incremental streaming text for TypeScript-style function calls."""
|
|
self._buffer += new_text
|
|
|
|
# Always use HarmonyParser for parsing to ensure proper filtering
|
|
events = self.harmony_parser.parse(new_text)
|
|
|
|
# Quick check if we might have tool calls
|
|
if (
|
|
"<|channel|>commentary to=" not in self._buffer
|
|
and not self.current_tool_name_sent
|
|
):
|
|
# No tool calls detected, check for final content
|
|
if (
|
|
"<|channel|>final" in self._buffer
|
|
or "assistantfinal" in self._buffer.lower()
|
|
):
|
|
# Extract normal text from events
|
|
normal_text = "".join(
|
|
[e.content for e in events if e.event_type == "normal"]
|
|
)
|
|
if normal_text:
|
|
self._buffer = ""
|
|
return StreamingParseResult(normal_text=normal_text, calls=[])
|
|
|
|
# For other content, extract normal text from events (with filtering applied)
|
|
normal_text = "".join(
|
|
[e.content for e in events if e.event_type == "normal"]
|
|
)
|
|
if normal_text or events:
|
|
self._buffer = ""
|
|
return StreamingParseResult(normal_text=normal_text, calls=[])
|
|
else:
|
|
# No events processed, continue buffering
|
|
return StreamingParseResult(normal_text="", calls=[])
|
|
|
|
if not events:
|
|
# No complete events yet
|
|
return StreamingParseResult(normal_text="", calls=[])
|
|
|
|
# Initialize state if needed
|
|
if not hasattr(self, "_tool_indices"):
|
|
self._tool_indices = self._get_tool_indices(tools)
|
|
|
|
calls = []
|
|
normal_text = ""
|
|
|
|
for event in events:
|
|
if event.event_type == "tool_call":
|
|
# We got a complete tool call from HarmonyParser
|
|
tool_call_info = self._extract_tool_call_from_event(
|
|
event.raw_text if event.raw_text else event.content,
|
|
self._tool_indices,
|
|
self.current_tool_id if self.current_tool_id >= 0 else 0,
|
|
)
|
|
|
|
if tool_call_info:
|
|
# Initialize state if first tool
|
|
if self.current_tool_id == -1:
|
|
self.current_tool_id = 0
|
|
self.prev_tool_call_arr = []
|
|
self.streamed_args_for_tool = [""]
|
|
|
|
# Ensure arrays are large enough
|
|
while len(self.prev_tool_call_arr) <= self.current_tool_id:
|
|
self.prev_tool_call_arr.append({})
|
|
while len(self.streamed_args_for_tool) <= self.current_tool_id:
|
|
self.streamed_args_for_tool.append("")
|
|
|
|
# Store tool call info
|
|
self.prev_tool_call_arr[self.current_tool_id] = {
|
|
"name": tool_call_info.name,
|
|
"arguments": json.loads(tool_call_info.parameters),
|
|
}
|
|
|
|
# Emit the complete tool call at once
|
|
# (Could be modified to emit name first, then args, if needed)
|
|
calls.append(tool_call_info)
|
|
|
|
# Mark as streamed
|
|
self.streamed_args_for_tool[self.current_tool_id] = (
|
|
tool_call_info.parameters
|
|
)
|
|
|
|
# Move to next tool
|
|
self.current_tool_id += 1
|
|
self.current_tool_name_sent = False
|
|
|
|
elif event.event_type == "normal":
|
|
normal_text += event.content
|
|
|
|
# Clear buffer since HarmonyParser handles buffering
|
|
self._buffer = ""
|
|
|
|
return StreamingParseResult(normal_text=normal_text, calls=calls)
|
|
|
|
def _extract_tool_call_from_event(
|
|
self, content: str, tool_indices: dict, tool_index: int
|
|
) -> Optional[ToolCallItem]:
|
|
"""
|
|
Extract tool call information from HarmonyParser event content.
|
|
|
|
Content format: "commentary to=functions.get_weather<|constrain|>json<|message|>{...}"
|
|
"""
|
|
match = self.tool_extract_pattern.search(content)
|
|
|
|
if not match:
|
|
logger.debug(f"Could not extract tool call from: {content[:100]}")
|
|
return None
|
|
|
|
full_function_name = match.group(1)
|
|
json_content = match.group(2)
|
|
|
|
# Extract function name (last part after .)
|
|
function_name = (
|
|
full_function_name.split(".")[-1]
|
|
if "." in full_function_name
|
|
else full_function_name
|
|
)
|
|
|
|
# Check if tool exists
|
|
if function_name not in tool_indices:
|
|
logger.debug(f"Function {function_name} not in available tools")
|
|
return None
|
|
|
|
# Parse JSON arguments
|
|
try:
|
|
arguments = json.loads(json_content) if json_content.strip() else {}
|
|
except json.JSONDecodeError as e:
|
|
logger.debug(f"Failed to parse JSON arguments: {e}")
|
|
return None
|
|
|
|
return ToolCallItem(
|
|
tool_index=tool_index,
|
|
name=function_name,
|
|
parameters=json.dumps(arguments, ensure_ascii=False),
|
|
)
|
|
|
|
def structure_info(self) -> _GetInfoFunc:
|
|
raise NotImplementedError("structure_info not used with HarmonyParser")
|
|
|
|
def build_ebnf(self, tools: List[Tool]) -> str:
|
|
raise NotImplementedError("build_ebnf not used with HarmonyParser")
|