799 lines
27 KiB
Python
799 lines
27 KiB
Python
# SPDX-License-Identifier: Apache-2.0
|
|
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
|
|
"""
|
|
Streaming SSE event builders for the Responses API.
|
|
|
|
Pure functions that translate streaming state + delta data into
|
|
OpenAI Response API SSE events. Used by the streaming event
|
|
processors in serving.py.
|
|
|
|
The file is organized as:
|
|
1. StreamingState dataclass + utility helpers
|
|
2. Shared leaf helpers — delta events (take plain strings, no context)
|
|
3. Shared leaf helpers — done events (take plain strings, no context)
|
|
4. Harmony-specific dispatchers (route ctx/previous_item → leaf helpers)
|
|
5. Harmony-specific tool lifecycle helpers
|
|
"""
|
|
|
|
import json
|
|
from dataclasses import dataclass
|
|
from typing import Final
|
|
|
|
from openai.types.responses import (
|
|
ResponseCodeInterpreterCallCodeDeltaEvent,
|
|
ResponseCodeInterpreterCallCodeDoneEvent,
|
|
ResponseCodeInterpreterCallCompletedEvent,
|
|
ResponseCodeInterpreterCallInProgressEvent,
|
|
ResponseCodeInterpreterCallInterpretingEvent,
|
|
ResponseCodeInterpreterToolCallParam,
|
|
ResponseContentPartAddedEvent,
|
|
ResponseContentPartDoneEvent,
|
|
ResponseFunctionCallArgumentsDeltaEvent,
|
|
ResponseFunctionCallArgumentsDoneEvent,
|
|
ResponseFunctionToolCall,
|
|
ResponseFunctionWebSearch,
|
|
ResponseMcpCallArgumentsDeltaEvent,
|
|
ResponseMcpCallArgumentsDoneEvent,
|
|
ResponseMcpCallCompletedEvent,
|
|
ResponseMcpCallInProgressEvent,
|
|
ResponseOutputItemAddedEvent,
|
|
ResponseOutputItemDoneEvent,
|
|
ResponseOutputMessage,
|
|
ResponseOutputText,
|
|
ResponseReasoningItem,
|
|
ResponseReasoningTextDeltaEvent,
|
|
ResponseReasoningTextDoneEvent,
|
|
ResponseTextDeltaEvent,
|
|
ResponseTextDoneEvent,
|
|
ResponseWebSearchCallCompletedEvent,
|
|
ResponseWebSearchCallInProgressEvent,
|
|
ResponseWebSearchCallSearchingEvent,
|
|
response_function_web_search,
|
|
)
|
|
from openai.types.responses.response_output_item import McpCall
|
|
from openai.types.responses.response_reasoning_item import (
|
|
Content as ResponseReasoningTextContent,
|
|
)
|
|
from openai_harmony import Message as HarmonyMessage
|
|
|
|
from vllm.entrypoints.mcp.tool_server import ToolServer
|
|
from vllm.entrypoints.openai.responses.context import StreamingHarmonyContext
|
|
from vllm.entrypoints.openai.responses.protocol import (
|
|
ResponseReasoningPartAddedEvent,
|
|
ResponseReasoningPartDoneEvent,
|
|
StreamingResponsesResponse,
|
|
)
|
|
from vllm.utils import random_uuid
|
|
|
|
TOOL_NAME_TO_MCP_SERVER_LABEL: Final[dict[str, str]] = {
|
|
"python": "code_interpreter",
|
|
"container": "container",
|
|
"browser": "web_search_preview",
|
|
}
|
|
|
|
|
|
def _resolve_mcp_name_label(recipient: str) -> tuple[str, str]:
|
|
"""Resolve MCP tool name and server label from a recipient string.
|
|
|
|
- ``mcp.*`` recipients: strip prefix, use the bare name as both
|
|
name and server_label.
|
|
- Everything else: use the recipient as the name and look up the
|
|
server_label in TOOL_NAME_TO_MCP_SERVER_LABEL.
|
|
"""
|
|
if recipient.startswith("mcp."):
|
|
name = recipient[len("mcp.") :]
|
|
return name, name
|
|
return recipient, TOOL_NAME_TO_MCP_SERVER_LABEL.get(recipient, recipient)
|
|
|
|
|
|
@dataclass
|
|
class StreamingState:
|
|
"""Mutable state for streaming event processing."""
|
|
|
|
current_content_index: int = -1
|
|
current_output_index: int = 0
|
|
current_item_id: str = ""
|
|
current_call_id: str = ""
|
|
sent_output_item_added: bool = False
|
|
is_first_function_call_delta: bool = False
|
|
|
|
def reset_for_new_item(self) -> None:
|
|
"""Reset state when expecting a new output item."""
|
|
self.current_output_index += 1
|
|
self.sent_output_item_added = False
|
|
self.is_first_function_call_delta = False
|
|
self.current_call_id = ""
|
|
|
|
|
|
def is_mcp_tool_by_namespace(recipient: str | None) -> bool:
|
|
"""
|
|
Determine if a tool call is an MCP tool based on recipient prefix.
|
|
|
|
- Tools starting with "functions." are function calls
|
|
- Everything else is an MCP tool
|
|
"""
|
|
if recipient is None:
|
|
return False
|
|
|
|
# Function calls have "functions." prefix
|
|
# Everything else is an MCP tool
|
|
return not recipient.startswith("functions.")
|
|
|
|
|
|
# =====================================================================
|
|
# Shared leaf helpers — delta events
|
|
# =====================================================================
|
|
|
|
|
|
def emit_text_delta_events(
|
|
delta: str,
|
|
state: StreamingState,
|
|
) -> list[StreamingResponsesResponse]:
|
|
"""Emit events for text content delta streaming."""
|
|
events: list[StreamingResponsesResponse] = []
|
|
if not state.sent_output_item_added:
|
|
state.sent_output_item_added = True
|
|
state.current_item_id = f"msg_{random_uuid()}"
|
|
events.append(
|
|
ResponseOutputItemAddedEvent(
|
|
type="response.output_item.added",
|
|
sequence_number=-1,
|
|
output_index=state.current_output_index,
|
|
item=ResponseOutputMessage(
|
|
id=state.current_item_id,
|
|
type="message",
|
|
role="assistant",
|
|
content=[],
|
|
status="in_progress",
|
|
),
|
|
)
|
|
)
|
|
state.current_content_index += 1
|
|
events.append(
|
|
ResponseContentPartAddedEvent(
|
|
type="response.content_part.added",
|
|
sequence_number=-1,
|
|
output_index=state.current_output_index,
|
|
item_id=state.current_item_id,
|
|
content_index=state.current_content_index,
|
|
part=ResponseOutputText(
|
|
type="output_text",
|
|
text="",
|
|
annotations=[],
|
|
logprobs=[],
|
|
),
|
|
)
|
|
)
|
|
events.append(
|
|
ResponseTextDeltaEvent(
|
|
type="response.output_text.delta",
|
|
sequence_number=-1,
|
|
content_index=state.current_content_index,
|
|
output_index=state.current_output_index,
|
|
item_id=state.current_item_id,
|
|
delta=delta,
|
|
# TODO, use logprobs from ctx.last_request_output
|
|
logprobs=[],
|
|
)
|
|
)
|
|
return events
|
|
|
|
|
|
def emit_reasoning_delta_events(
|
|
delta: str,
|
|
state: StreamingState,
|
|
) -> list[StreamingResponsesResponse]:
|
|
"""Emit events for reasoning text delta streaming."""
|
|
events: list[StreamingResponsesResponse] = []
|
|
if not state.sent_output_item_added:
|
|
state.sent_output_item_added = True
|
|
state.current_item_id = f"msg_{random_uuid()}"
|
|
events.append(
|
|
ResponseOutputItemAddedEvent(
|
|
type="response.output_item.added",
|
|
sequence_number=-1,
|
|
output_index=state.current_output_index,
|
|
item=ResponseReasoningItem(
|
|
type="reasoning",
|
|
id=state.current_item_id,
|
|
summary=[],
|
|
status="in_progress",
|
|
),
|
|
)
|
|
)
|
|
state.current_content_index += 1
|
|
events.append(
|
|
ResponseReasoningPartAddedEvent(
|
|
type="response.reasoning_part.added",
|
|
sequence_number=-1,
|
|
output_index=state.current_output_index,
|
|
item_id=state.current_item_id,
|
|
content_index=state.current_content_index,
|
|
part=ResponseReasoningTextContent(
|
|
text="",
|
|
type="reasoning_text",
|
|
),
|
|
)
|
|
)
|
|
events.append(
|
|
ResponseReasoningTextDeltaEvent(
|
|
type="response.reasoning_text.delta",
|
|
item_id=state.current_item_id,
|
|
output_index=state.current_output_index,
|
|
content_index=state.current_content_index,
|
|
delta=delta,
|
|
sequence_number=-1,
|
|
)
|
|
)
|
|
return events
|
|
|
|
|
|
def emit_function_call_delta_events(
|
|
delta: str,
|
|
function_name: str,
|
|
state: StreamingState,
|
|
) -> list[StreamingResponsesResponse]:
|
|
"""Emit events for function call argument deltas."""
|
|
events: list[StreamingResponsesResponse] = []
|
|
if state.is_first_function_call_delta is False:
|
|
state.is_first_function_call_delta = True
|
|
state.current_item_id = f"fc_{random_uuid()}"
|
|
state.current_call_id = f"call_{random_uuid()}"
|
|
tool_call_item = ResponseFunctionToolCall(
|
|
name=function_name,
|
|
type="function_call",
|
|
id=state.current_item_id,
|
|
call_id=state.current_call_id,
|
|
arguments="",
|
|
status="in_progress",
|
|
)
|
|
events.append(
|
|
ResponseOutputItemAddedEvent(
|
|
type="response.output_item.added",
|
|
sequence_number=-1,
|
|
output_index=state.current_output_index,
|
|
item=tool_call_item,
|
|
)
|
|
)
|
|
# Always emit the delta (including on first call)
|
|
events.append(
|
|
ResponseFunctionCallArgumentsDeltaEvent(
|
|
item_id=state.current_item_id,
|
|
delta=delta,
|
|
output_index=state.current_output_index,
|
|
sequence_number=-1,
|
|
type="response.function_call_arguments.delta",
|
|
)
|
|
)
|
|
return events
|
|
|
|
|
|
def emit_mcp_delta_events(
|
|
delta: str,
|
|
state: StreamingState,
|
|
recipient: str,
|
|
) -> list[StreamingResponsesResponse]:
|
|
"""Emit events for MCP tool delta streaming."""
|
|
name, server_label = _resolve_mcp_name_label(recipient)
|
|
events: list[StreamingResponsesResponse] = []
|
|
if not state.sent_output_item_added:
|
|
state.sent_output_item_added = True
|
|
state.current_item_id = f"mcp_{random_uuid()}"
|
|
events.append(
|
|
ResponseOutputItemAddedEvent(
|
|
type="response.output_item.added",
|
|
sequence_number=-1,
|
|
output_index=state.current_output_index,
|
|
item=McpCall(
|
|
type="mcp_call",
|
|
id=state.current_item_id,
|
|
name=name,
|
|
arguments="",
|
|
server_label=server_label,
|
|
status="in_progress",
|
|
),
|
|
)
|
|
)
|
|
events.append(
|
|
ResponseMcpCallInProgressEvent(
|
|
type="response.mcp_call.in_progress",
|
|
sequence_number=-1,
|
|
output_index=state.current_output_index,
|
|
item_id=state.current_item_id,
|
|
)
|
|
)
|
|
events.append(
|
|
ResponseMcpCallArgumentsDeltaEvent(
|
|
type="response.mcp_call_arguments.delta",
|
|
sequence_number=-1,
|
|
output_index=state.current_output_index,
|
|
item_id=state.current_item_id,
|
|
delta=delta,
|
|
)
|
|
)
|
|
return events
|
|
|
|
|
|
def emit_code_interpreter_delta_events(
|
|
delta: str,
|
|
state: StreamingState,
|
|
) -> list[StreamingResponsesResponse]:
|
|
"""Emit events for code interpreter delta streaming."""
|
|
events: list[StreamingResponsesResponse] = []
|
|
if not state.sent_output_item_added:
|
|
state.sent_output_item_added = True
|
|
state.current_item_id = f"tool_{random_uuid()}"
|
|
events.append(
|
|
ResponseOutputItemAddedEvent(
|
|
type="response.output_item.added",
|
|
sequence_number=-1,
|
|
output_index=state.current_output_index,
|
|
item=ResponseCodeInterpreterToolCallParam(
|
|
type="code_interpreter_call",
|
|
id=state.current_item_id,
|
|
code=None,
|
|
container_id="auto",
|
|
outputs=None,
|
|
status="in_progress",
|
|
),
|
|
)
|
|
)
|
|
events.append(
|
|
ResponseCodeInterpreterCallInProgressEvent(
|
|
type="response.code_interpreter_call.in_progress",
|
|
sequence_number=-1,
|
|
output_index=state.current_output_index,
|
|
item_id=state.current_item_id,
|
|
)
|
|
)
|
|
events.append(
|
|
ResponseCodeInterpreterCallCodeDeltaEvent(
|
|
type="response.code_interpreter_call_code.delta",
|
|
sequence_number=-1,
|
|
output_index=state.current_output_index,
|
|
item_id=state.current_item_id,
|
|
delta=delta,
|
|
)
|
|
)
|
|
return events
|
|
|
|
|
|
# =====================================================================
|
|
# Shared leaf helpers — done events
|
|
# =====================================================================
|
|
|
|
|
|
def emit_text_output_done_events(
|
|
text: str,
|
|
state: StreamingState,
|
|
) -> list[StreamingResponsesResponse]:
|
|
"""Emit events when a final text output item completes."""
|
|
text_content = ResponseOutputText(
|
|
type="output_text",
|
|
text=text,
|
|
annotations=[],
|
|
)
|
|
events: list[StreamingResponsesResponse] = []
|
|
events.append(
|
|
ResponseTextDoneEvent(
|
|
type="response.output_text.done",
|
|
sequence_number=-1,
|
|
output_index=state.current_output_index,
|
|
content_index=state.current_content_index,
|
|
text=text,
|
|
logprobs=[],
|
|
item_id=state.current_item_id,
|
|
)
|
|
)
|
|
events.append(
|
|
ResponseContentPartDoneEvent(
|
|
type="response.content_part.done",
|
|
sequence_number=-1,
|
|
item_id=state.current_item_id,
|
|
output_index=state.current_output_index,
|
|
content_index=state.current_content_index,
|
|
part=text_content,
|
|
)
|
|
)
|
|
events.append(
|
|
ResponseOutputItemDoneEvent(
|
|
type="response.output_item.done",
|
|
sequence_number=-1,
|
|
output_index=state.current_output_index,
|
|
item=ResponseOutputMessage(
|
|
id=state.current_item_id,
|
|
type="message",
|
|
role="assistant",
|
|
content=[text_content],
|
|
status="completed",
|
|
),
|
|
)
|
|
)
|
|
return events
|
|
|
|
|
|
def emit_reasoning_done_events(
|
|
text: str,
|
|
state: StreamingState,
|
|
) -> list[StreamingResponsesResponse]:
|
|
"""Emit events when a reasoning (analysis) item completes."""
|
|
content = ResponseReasoningTextContent(
|
|
text=text,
|
|
type="reasoning_text",
|
|
)
|
|
reasoning_item = ResponseReasoningItem(
|
|
type="reasoning",
|
|
content=[content],
|
|
status="completed",
|
|
id=state.current_item_id,
|
|
summary=[],
|
|
)
|
|
events: list[StreamingResponsesResponse] = []
|
|
events.append(
|
|
ResponseReasoningTextDoneEvent(
|
|
type="response.reasoning_text.done",
|
|
item_id=state.current_item_id,
|
|
sequence_number=-1,
|
|
output_index=state.current_output_index,
|
|
content_index=state.current_content_index,
|
|
text=text,
|
|
)
|
|
)
|
|
events.append(
|
|
ResponseReasoningPartDoneEvent(
|
|
type="response.reasoning_part.done",
|
|
sequence_number=-1,
|
|
item_id=state.current_item_id,
|
|
output_index=state.current_output_index,
|
|
content_index=state.current_content_index,
|
|
part=content,
|
|
)
|
|
)
|
|
events.append(
|
|
ResponseOutputItemDoneEvent(
|
|
type="response.output_item.done",
|
|
sequence_number=-1,
|
|
output_index=state.current_output_index,
|
|
item=reasoning_item,
|
|
)
|
|
)
|
|
return events
|
|
|
|
|
|
def emit_function_call_done_events(
|
|
function_name: str,
|
|
arguments: str,
|
|
state: StreamingState,
|
|
) -> list[StreamingResponsesResponse]:
|
|
"""Emit events when a function call completes."""
|
|
events: list[StreamingResponsesResponse] = []
|
|
events.append(
|
|
ResponseFunctionCallArgumentsDoneEvent(
|
|
type="response.function_call_arguments.done",
|
|
arguments=arguments,
|
|
name=function_name,
|
|
item_id=state.current_item_id,
|
|
output_index=state.current_output_index,
|
|
sequence_number=-1,
|
|
)
|
|
)
|
|
function_call_item = ResponseFunctionToolCall(
|
|
type="function_call",
|
|
arguments=arguments,
|
|
name=function_name,
|
|
item_id=state.current_item_id,
|
|
output_index=state.current_output_index,
|
|
sequence_number=-1,
|
|
call_id=state.current_call_id,
|
|
status="completed",
|
|
)
|
|
events.append(
|
|
ResponseOutputItemDoneEvent(
|
|
type="response.output_item.done",
|
|
sequence_number=-1,
|
|
output_index=state.current_output_index,
|
|
item=function_call_item,
|
|
)
|
|
)
|
|
return events
|
|
|
|
|
|
def emit_mcp_completion_events(
|
|
recipient: str,
|
|
arguments: str,
|
|
state: StreamingState,
|
|
) -> list[StreamingResponsesResponse]:
|
|
"""Emit events when an MCP tool call completes."""
|
|
name, server_label = _resolve_mcp_name_label(recipient)
|
|
events: list[StreamingResponsesResponse] = []
|
|
events.append(
|
|
ResponseMcpCallArgumentsDoneEvent(
|
|
type="response.mcp_call_arguments.done",
|
|
arguments=arguments,
|
|
name=name,
|
|
item_id=state.current_item_id,
|
|
output_index=state.current_output_index,
|
|
sequence_number=-1,
|
|
)
|
|
)
|
|
events.append(
|
|
ResponseMcpCallCompletedEvent(
|
|
type="response.mcp_call.completed",
|
|
sequence_number=-1,
|
|
output_index=state.current_output_index,
|
|
item_id=state.current_item_id,
|
|
)
|
|
)
|
|
events.append(
|
|
ResponseOutputItemDoneEvent(
|
|
type="response.output_item.done",
|
|
sequence_number=-1,
|
|
output_index=state.current_output_index,
|
|
item=McpCall(
|
|
type="mcp_call",
|
|
arguments=arguments,
|
|
name=name,
|
|
id=state.current_item_id,
|
|
server_label=server_label,
|
|
status="completed",
|
|
),
|
|
)
|
|
)
|
|
return events
|
|
|
|
|
|
# =====================================================================
|
|
# Harmony-specific dispatchers
|
|
# =====================================================================
|
|
|
|
|
|
def emit_content_delta_events(
|
|
ctx: StreamingHarmonyContext,
|
|
state: StreamingState,
|
|
) -> list[StreamingResponsesResponse]:
|
|
"""Emit events for content delta streaming based on channel type.
|
|
|
|
This is a Harmony-specific dispatcher that extracts values from the
|
|
Harmony context and delegates to shared leaf helpers.
|
|
"""
|
|
delta = ctx.last_content_delta
|
|
if not delta:
|
|
return []
|
|
|
|
channel = ctx.parser.current_channel
|
|
recipient = ctx.parser.current_recipient
|
|
|
|
if channel in ("final", "commentary") and recipient is None:
|
|
# Preambles (commentary with no recipient) and final messages
|
|
# are both user-visible text.
|
|
return emit_text_delta_events(delta, state)
|
|
elif channel == "analysis" and recipient is None:
|
|
return emit_reasoning_delta_events(delta, state)
|
|
# built-in tools will be triggered on the analysis channel
|
|
# However, occasionally built-in tools will
|
|
# still be output to commentary.
|
|
elif channel in ("commentary", "analysis") and recipient is not None:
|
|
if recipient.startswith("functions."):
|
|
function_name = recipient[len("functions.") :]
|
|
return emit_function_call_delta_events(delta, function_name, state)
|
|
elif recipient == "python":
|
|
return emit_code_interpreter_delta_events(delta, state)
|
|
elif recipient.startswith("mcp.") or is_mcp_tool_by_namespace(recipient):
|
|
return emit_mcp_delta_events(delta, state, recipient)
|
|
|
|
return []
|
|
|
|
|
|
def emit_previous_item_done_events(
|
|
previous_item: HarmonyMessage,
|
|
state: StreamingState,
|
|
) -> list[StreamingResponsesResponse]:
|
|
"""Emit done events for the previous item when expecting a new start.
|
|
|
|
This is a Harmony-specific dispatcher that extracts values from the
|
|
Harmony parser's message object and delegates to shared leaf helpers.
|
|
"""
|
|
text = previous_item.content[0].text
|
|
if previous_item.recipient is not None:
|
|
# Deal with tool call
|
|
if previous_item.recipient.startswith("functions."):
|
|
function_name = previous_item.recipient[len("functions.") :]
|
|
return emit_function_call_done_events(function_name, text, state)
|
|
elif previous_item.recipient == "python":
|
|
return emit_code_interpreter_completion_events(previous_item, state)
|
|
elif (
|
|
is_mcp_tool_by_namespace(previous_item.recipient)
|
|
and state.current_item_id is not None
|
|
and state.current_item_id.startswith("mcp_")
|
|
):
|
|
return emit_mcp_completion_events(previous_item.recipient, text, state)
|
|
elif previous_item.channel == "analysis":
|
|
return emit_reasoning_done_events(text, state)
|
|
elif previous_item.channel in ("commentary", "final"):
|
|
# Preambles (commentary with no recipient) and final messages
|
|
# are both user-visible text.
|
|
return emit_text_output_done_events(text, state)
|
|
return []
|
|
|
|
|
|
# =====================================================================
|
|
# Harmony-specific tool lifecycle helpers
|
|
# =====================================================================
|
|
|
|
|
|
def emit_browser_tool_events(
|
|
previous_item: HarmonyMessage,
|
|
state: StreamingState,
|
|
) -> list[StreamingResponsesResponse]:
|
|
"""Emit events for browser tool calls (web search)."""
|
|
function_name = previous_item.recipient[len("browser.") :]
|
|
parsed_args = json.loads(previous_item.content[0].text)
|
|
action = None
|
|
|
|
if function_name == "search":
|
|
action = response_function_web_search.ActionSearch(
|
|
type="search",
|
|
query=parsed_args["query"],
|
|
)
|
|
elif function_name == "open":
|
|
action = response_function_web_search.ActionOpenPage(
|
|
type="open_page",
|
|
# TODO: translate to url
|
|
url=f"cursor:{parsed_args.get('cursor', '')}",
|
|
)
|
|
elif function_name == "find":
|
|
action = response_function_web_search.ActionFind(
|
|
type="find",
|
|
pattern=parsed_args["pattern"],
|
|
# TODO: translate to url
|
|
url=f"cursor:{parsed_args.get('cursor', '')}",
|
|
)
|
|
else:
|
|
raise ValueError(f"Unknown function name: {function_name}")
|
|
|
|
state.current_item_id = f"tool_{random_uuid()}"
|
|
events: list[StreamingResponsesResponse] = []
|
|
events.append(
|
|
ResponseOutputItemAddedEvent(
|
|
type="response.output_item.added",
|
|
sequence_number=-1,
|
|
output_index=state.current_output_index,
|
|
item=response_function_web_search.ResponseFunctionWebSearch(
|
|
# TODO: generate a unique id for web search call
|
|
type="web_search_call",
|
|
id=state.current_item_id,
|
|
action=action,
|
|
status="in_progress",
|
|
),
|
|
)
|
|
)
|
|
events.append(
|
|
ResponseWebSearchCallInProgressEvent(
|
|
type="response.web_search_call.in_progress",
|
|
sequence_number=-1,
|
|
output_index=state.current_output_index,
|
|
item_id=state.current_item_id,
|
|
)
|
|
)
|
|
events.append(
|
|
ResponseWebSearchCallSearchingEvent(
|
|
type="response.web_search_call.searching",
|
|
sequence_number=-1,
|
|
output_index=state.current_output_index,
|
|
item_id=state.current_item_id,
|
|
)
|
|
)
|
|
# enqueue
|
|
events.append(
|
|
ResponseWebSearchCallCompletedEvent(
|
|
type="response.web_search_call.completed",
|
|
sequence_number=-1,
|
|
output_index=state.current_output_index,
|
|
item_id=state.current_item_id,
|
|
)
|
|
)
|
|
events.append(
|
|
ResponseOutputItemDoneEvent(
|
|
type="response.output_item.done",
|
|
sequence_number=-1,
|
|
output_index=state.current_output_index,
|
|
item=ResponseFunctionWebSearch(
|
|
type="web_search_call",
|
|
id=state.current_item_id,
|
|
action=action,
|
|
status="completed",
|
|
),
|
|
)
|
|
)
|
|
return events
|
|
|
|
|
|
def emit_code_interpreter_completion_events(
|
|
previous_item: HarmonyMessage,
|
|
state: StreamingState,
|
|
) -> list[StreamingResponsesResponse]:
|
|
"""Emit events when code interpreter completes."""
|
|
events: list[StreamingResponsesResponse] = []
|
|
events.append(
|
|
ResponseCodeInterpreterCallCodeDoneEvent(
|
|
type="response.code_interpreter_call_code.done",
|
|
sequence_number=-1,
|
|
output_index=state.current_output_index,
|
|
item_id=state.current_item_id,
|
|
code=previous_item.content[0].text,
|
|
)
|
|
)
|
|
events.append(
|
|
ResponseCodeInterpreterCallInterpretingEvent(
|
|
type="response.code_interpreter_call.interpreting",
|
|
sequence_number=-1,
|
|
output_index=state.current_output_index,
|
|
item_id=state.current_item_id,
|
|
)
|
|
)
|
|
events.append(
|
|
ResponseCodeInterpreterCallCompletedEvent(
|
|
type="response.code_interpreter_call.completed",
|
|
sequence_number=-1,
|
|
output_index=state.current_output_index,
|
|
item_id=state.current_item_id,
|
|
)
|
|
)
|
|
events.append(
|
|
ResponseOutputItemDoneEvent(
|
|
type="response.output_item.done",
|
|
sequence_number=-1,
|
|
output_index=state.current_output_index,
|
|
item=ResponseCodeInterpreterToolCallParam(
|
|
type="code_interpreter_call",
|
|
id=state.current_item_id,
|
|
code=previous_item.content[0].text,
|
|
container_id="auto",
|
|
outputs=[],
|
|
status="completed",
|
|
),
|
|
)
|
|
)
|
|
return events
|
|
|
|
|
|
def emit_tool_action_events(
|
|
ctx: StreamingHarmonyContext,
|
|
state: StreamingState,
|
|
tool_server: ToolServer | None,
|
|
) -> list[StreamingResponsesResponse]:
|
|
"""Emit events for tool action turn."""
|
|
if not ctx.is_assistant_action_turn() or len(ctx.parser.messages) == 0:
|
|
return []
|
|
|
|
events: list[StreamingResponsesResponse] = []
|
|
previous_item = ctx.parser.messages[-1]
|
|
|
|
# Handle browser tool
|
|
if (
|
|
tool_server is not None
|
|
and tool_server.has_tool("browser")
|
|
and previous_item.recipient is not None
|
|
and previous_item.recipient.startswith("browser.")
|
|
):
|
|
events.extend(emit_browser_tool_events(previous_item, state))
|
|
|
|
# Handle tool completion
|
|
if (
|
|
tool_server is not None
|
|
and previous_item.recipient is not None
|
|
and state.current_item_id is not None
|
|
and state.sent_output_item_added
|
|
):
|
|
recipient = previous_item.recipient
|
|
if recipient == "python":
|
|
events.extend(emit_code_interpreter_completion_events(previous_item, state))
|
|
elif recipient.startswith("mcp.") or is_mcp_tool_by_namespace(recipient):
|
|
events.extend(
|
|
emit_mcp_completion_events(
|
|
recipient, previous_item.content[0].text, state
|
|
)
|
|
)
|
|
|
|
return events
|