Signed-off-by: Xinyuan Tong <justinning0323@outlook.com> Signed-off-by: Xinyuan Tong <xinyuantong.cs@gmail.com> Co-authored-by: Xinyuan Tong <justinning0323@outlook.com> Co-authored-by: Xinyuan Tong <xinyuantong.cs@gmail.com>
371 lines
13 KiB
Python
371 lines
13 KiB
Python
# SPDX-License-Identifier: Apache-2.0
|
|
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
|
|
import datetime
|
|
import json
|
|
from collections.abc import Iterable
|
|
from typing import Literal, Optional, Union
|
|
|
|
from openai.types.responses import (
|
|
ResponseOutputItem,
|
|
ResponseOutputMessage,
|
|
ResponseOutputText,
|
|
ResponseReasoningItem,
|
|
)
|
|
from openai.types.responses.response_function_tool_call import ResponseFunctionToolCall
|
|
from openai.types.responses.response_function_web_search import (
|
|
ActionFind,
|
|
ActionOpenPage,
|
|
ActionSearch,
|
|
ResponseFunctionWebSearch,
|
|
)
|
|
from openai.types.responses.response_reasoning_item import (
|
|
Content as ResponseReasoningTextContent,
|
|
)
|
|
from openai.types.responses.tool import Tool
|
|
from openai_harmony import (
|
|
Author,
|
|
Conversation,
|
|
DeveloperContent,
|
|
HarmonyEncodingName,
|
|
Message,
|
|
ReasoningEffort,
|
|
Role,
|
|
StreamableParser,
|
|
SystemContent,
|
|
TextContent,
|
|
ToolDescription,
|
|
load_harmony_encoding,
|
|
)
|
|
|
|
from sglang.srt.entrypoints.openai.protocol import ResponseInputOutputItem
|
|
from sglang.srt.utils import random_uuid
|
|
|
|
REASONING_EFFORT = {
|
|
"high": ReasoningEffort.HIGH,
|
|
"medium": ReasoningEffort.MEDIUM,
|
|
"low": ReasoningEffort.LOW,
|
|
}
|
|
|
|
_harmony_encoding = None
|
|
|
|
|
|
def get_encoding():
|
|
global _harmony_encoding
|
|
if _harmony_encoding is None:
|
|
_harmony_encoding = load_harmony_encoding(HarmonyEncodingName.HARMONY_GPT_OSS)
|
|
return _harmony_encoding
|
|
|
|
|
|
def get_system_message(
|
|
model_identity: Optional[str] = None,
|
|
reasoning_effort: Optional[Literal["high", "medium", "low"]] = None,
|
|
start_date: Optional[str] = None,
|
|
browser_description: Optional[str] = None,
|
|
python_description: Optional[str] = None,
|
|
) -> Message:
|
|
sys_msg_content = SystemContent.new()
|
|
if model_identity is not None:
|
|
sys_msg_content = sys_msg_content.with_model_identity(model_identity)
|
|
if reasoning_effort is not None:
|
|
sys_msg_content = sys_msg_content.with_reasoning_effort(
|
|
REASONING_EFFORT[reasoning_effort]
|
|
)
|
|
if start_date is None:
|
|
start_date = datetime.datetime.now().strftime("%Y-%m-%d")
|
|
sys_msg_content = sys_msg_content.with_conversation_start_date(start_date)
|
|
if browser_description is not None:
|
|
sys_msg_content = sys_msg_content.with_tools(browser_description)
|
|
if python_description is not None:
|
|
sys_msg_content = sys_msg_content.with_tools(python_description)
|
|
sys_msg = Message.from_role_and_content(Role.SYSTEM, sys_msg_content)
|
|
return sys_msg
|
|
|
|
|
|
def get_developer_message(
|
|
instructions: Optional[str] = None, tools: Optional[list[Tool]] = None
|
|
) -> Message:
|
|
dev_msg_content = DeveloperContent.new()
|
|
if instructions is not None:
|
|
dev_msg_content = dev_msg_content.with_instructions(instructions)
|
|
if tools is not None:
|
|
function_tools = []
|
|
for tool in tools:
|
|
if tool.type in ("web_search_preview", "code_interpreter"):
|
|
# These are built-in tools that are added to the system message.
|
|
pass
|
|
elif tool.type == "function":
|
|
function_tools.append(tool)
|
|
else:
|
|
raise ValueError(f"tool type {tool.type} not supported")
|
|
if function_tools:
|
|
function_tool_descriptions = [
|
|
ToolDescription.new(
|
|
name=tool.name,
|
|
description=tool.description,
|
|
parameters=tool.parameters,
|
|
)
|
|
for tool in function_tools
|
|
]
|
|
dev_msg_content = dev_msg_content.with_function_tools(
|
|
function_tool_descriptions
|
|
)
|
|
dev_msg = Message.from_role_and_content(Role.DEVELOPER, dev_msg_content)
|
|
return dev_msg
|
|
|
|
|
|
def get_user_message(content: str) -> Message:
|
|
return Message.from_role_and_content(Role.USER, content)
|
|
|
|
|
|
def parse_response_input(
|
|
response_msg: ResponseInputOutputItem,
|
|
prev_responses: list[Union[ResponseOutputItem, ResponseReasoningItem]],
|
|
) -> Message:
|
|
if not isinstance(response_msg, dict):
|
|
response_msg = response_msg.model_dump()
|
|
if "type" not in response_msg or response_msg["type"] == "message":
|
|
role = response_msg["role"]
|
|
content = response_msg["content"]
|
|
if role == "system":
|
|
# User is trying to set a system message. Change it to:
|
|
# <|start|>developer<|message|># Instructions
|
|
# {instructions}<|end|>
|
|
role = "developer"
|
|
text_prefix = "Instructions:\n"
|
|
else:
|
|
text_prefix = ""
|
|
if isinstance(content, str):
|
|
msg = Message.from_role_and_content(role, text_prefix + content)
|
|
else:
|
|
contents = [TextContent(text=text_prefix + c["text"]) for c in content]
|
|
msg = Message.from_role_and_contents(role, contents)
|
|
elif response_msg["type"] == "function_call_output":
|
|
call_id = response_msg["call_id"]
|
|
call_response: Optional[ResponseFunctionToolCall] = None
|
|
for prev_response in reversed(prev_responses):
|
|
if (
|
|
isinstance(prev_response, ResponseFunctionToolCall)
|
|
and prev_response.call_id == call_id
|
|
):
|
|
call_response = prev_response
|
|
break
|
|
if call_response is None:
|
|
raise ValueError(f"No call message found for {call_id}")
|
|
msg = Message.from_author_and_content(
|
|
Author.new(Role.TOOL, f"functions.{call_response.name}"),
|
|
response_msg["output"],
|
|
)
|
|
elif response_msg["type"] == "reasoning":
|
|
content = response_msg["content"]
|
|
assert len(content) == 1
|
|
msg = Message.from_role_and_content(Role.ASSISTANT, content[0]["text"])
|
|
elif response_msg["type"] == "function_call":
|
|
msg = Message.from_role_and_content(Role.ASSISTANT, response_msg["arguments"])
|
|
msg = msg.with_channel("commentary")
|
|
msg = msg.with_recipient(f"functions.{response_msg['name']}")
|
|
msg = msg.with_content_type("json")
|
|
else:
|
|
raise ValueError(f"Unknown input type: {response_msg['type']}")
|
|
return msg
|
|
|
|
|
|
def parse_response_output(output: ResponseOutputItem) -> Message:
|
|
if isinstance(output, ResponseOutputMessage):
|
|
role = output.role
|
|
contents = [TextContent(text=c.text) for c in output.content]
|
|
msg = Message.from_role_and_contents(role, contents)
|
|
return msg
|
|
elif isinstance(output, ResponseFunctionToolCall):
|
|
msg = Message.from_role_and_content(Role.ASSISTANT, output.arguments)
|
|
msg = msg.with_channel("commentary")
|
|
msg = msg.with_recipient(output.name)
|
|
msg = msg.with_content_type("json")
|
|
return msg
|
|
else:
|
|
raise ValueError(f"Unknown output type: {type(output)}")
|
|
|
|
|
|
def parse_chat_input(chat_msg) -> Message:
|
|
role = chat_msg.role
|
|
content = chat_msg.content
|
|
if isinstance(content, str):
|
|
contents = [TextContent(text=content)]
|
|
else:
|
|
# TODO: Support refusal.
|
|
contents = [TextContent(text=c.text) for c in content]
|
|
msg = Message.from_role_and_contents(role, contents)
|
|
return msg
|
|
|
|
|
|
def render_for_completion(messages: list[Message]) -> list[int]:
|
|
conversation = Conversation.from_messages(messages)
|
|
token_ids = get_encoding().render_conversation_for_completion(
|
|
conversation, Role.ASSISTANT
|
|
)
|
|
return token_ids
|
|
|
|
|
|
def get_stop_tokens_for_assistant_actions() -> list[int]:
|
|
return get_encoding().stop_tokens_for_assistant_actions()
|
|
|
|
|
|
def get_streamable_parser_for_assistant() -> StreamableParser:
|
|
return StreamableParser(get_encoding(), role=Role.ASSISTANT)
|
|
|
|
|
|
def parse_output_message(message: Message):
|
|
if message.author.role != "assistant":
|
|
# This is a message from a tool to the assistant (e.g., search result).
|
|
# Don't include it in the final output for now. This aligns with
|
|
# OpenAI's behavior on models like o4-mini.
|
|
return []
|
|
|
|
output_items = []
|
|
recipient = message.recipient
|
|
if recipient is not None and recipient.startswith("browser."):
|
|
if len(message.content) != 1:
|
|
raise ValueError("Invalid number of contents in browser message")
|
|
content = message.content[0]
|
|
browser_call = json.loads(content.text)
|
|
# TODO: translate to url properly!
|
|
if recipient == "browser.search":
|
|
action = ActionSearch(
|
|
query=f"cursor:{browser_call.get('query', '')}", type="search"
|
|
)
|
|
elif recipient == "browser.open":
|
|
action = ActionOpenPage(
|
|
url=f"cursor:{browser_call.get('url', '')}", type="open_page"
|
|
)
|
|
elif recipient == "browser.find":
|
|
action = ActionFind(
|
|
pattern=browser_call["pattern"],
|
|
url=f"cursor:{browser_call.get('url', '')}",
|
|
type="find",
|
|
)
|
|
else:
|
|
raise ValueError(f"Unknown browser action: {recipient}")
|
|
web_search_item = ResponseFunctionWebSearch(
|
|
id=f"ws_{random_uuid()}",
|
|
action=action,
|
|
status="completed",
|
|
type="web_search_call",
|
|
)
|
|
output_items.append(web_search_item)
|
|
elif message.channel == "analysis":
|
|
for content in message.content:
|
|
reasoning_item = ResponseReasoningItem(
|
|
id=f"rs_{random_uuid()}",
|
|
type="reasoning",
|
|
summary=[],
|
|
content=[
|
|
ResponseReasoningTextContent(
|
|
text=content.text, type="reasoning_text"
|
|
)
|
|
],
|
|
status=None,
|
|
)
|
|
output_items.append(reasoning_item)
|
|
elif message.channel == "commentary":
|
|
if message.recipient.startswith("functions."):
|
|
function_name = message.recipient.split(".")[-1]
|
|
for content in message.content:
|
|
random_id = random_uuid()
|
|
response_item = ResponseFunctionToolCall(
|
|
arguments=content.text,
|
|
call_id=f"call_{random_id}",
|
|
type="function_call",
|
|
name=function_name,
|
|
id=f"ft_{random_id}",
|
|
)
|
|
output_items.append(response_item)
|
|
elif message.recipient.startswith("python") or message.recipient.startswith(
|
|
"browser"
|
|
):
|
|
for content in message.content:
|
|
reasoning_item = ResponseReasoningItem(
|
|
id=f"rs_{random_uuid()}",
|
|
type="reasoning",
|
|
summary=[],
|
|
content=[
|
|
ResponseReasoningTextContent(
|
|
text=content.text, type="reasoning_text"
|
|
)
|
|
],
|
|
status=None,
|
|
)
|
|
output_items.append(reasoning_item)
|
|
else:
|
|
raise ValueError(f"Unknown recipient: {message.recipient}")
|
|
elif message.channel == "final":
|
|
contents = []
|
|
for content in message.content:
|
|
output_text = ResponseOutputText(
|
|
text=content.text,
|
|
annotations=[], # TODO
|
|
type="output_text",
|
|
logprobs=None, # TODO
|
|
)
|
|
contents.append(output_text)
|
|
text_item = ResponseOutputMessage(
|
|
id=f"msg_{random_uuid()}",
|
|
content=contents,
|
|
role=message.author.role,
|
|
status="completed",
|
|
type="message",
|
|
)
|
|
output_items.append(text_item)
|
|
else:
|
|
raise ValueError(f"Unknown channel: {message.channel}")
|
|
return output_items
|
|
|
|
|
|
def parse_remaining_state(parser: StreamableParser):
|
|
if not parser.current_content:
|
|
return []
|
|
if parser.current_role != Role.ASSISTANT:
|
|
return []
|
|
current_recipient = parser.current_recipient
|
|
if current_recipient is not None and current_recipient.startswith("browser."):
|
|
return []
|
|
|
|
if parser.current_channel == "analysis":
|
|
reasoning_item = ResponseReasoningItem(
|
|
id=f"rs_{random_uuid()}",
|
|
type="reasoning",
|
|
summary=[],
|
|
content=[
|
|
ResponseReasoningTextContent(
|
|
text=parser.current_content, type="reasoning_text"
|
|
)
|
|
],
|
|
status=None,
|
|
)
|
|
return [reasoning_item]
|
|
elif parser.current_channel == "final":
|
|
output_text = ResponseOutputText(
|
|
content=[
|
|
ResponseReasoningTextContent(
|
|
text=parser.current_content, type="reasoning_text"
|
|
)
|
|
],
|
|
annotations=[], # TODO
|
|
type="output_text",
|
|
logprobs=None, # TODO
|
|
)
|
|
text_item = ResponseOutputMessage(
|
|
id=f"msg_{random_uuid()}",
|
|
content=[output_text],
|
|
role="assistant",
|
|
status="completed",
|
|
type="message",
|
|
)
|
|
return [text_item]
|
|
return []
|
|
|
|
|
|
def parse_output_into_messages(token_ids: Iterable[int]):
|
|
parser = get_streamable_parser_for_assistant()
|
|
for token_id in token_ids:
|
|
parser.process(token_id)
|
|
return parser
|