# SPDX-License-Identifier: Apache-2.0 # SPDX-FileCopyrightText: Copyright contributors to the vLLM project import datetime import json from collections.abc import Iterable, Sequence from typing import Literal from openai.types.responses import ( ResponseFunctionToolCall, ResponseOutputItem, ResponseOutputMessage, ResponseOutputText, ResponseReasoningItem, ) from openai.types.responses.response_function_web_search import ( ActionFind, ActionOpenPage, ActionSearch, ResponseFunctionWebSearch, ) from openai.types.responses.response_output_item import McpCall from openai.types.responses.response_reasoning_item import ( Content as ResponseReasoningTextContent, ) from openai.types.responses.tool import Tool from openai_harmony import ( Author, ChannelConfig, Conversation, DeveloperContent, HarmonyEncodingName, Message, ReasoningEffort, Role, StreamableParser, SystemContent, TextContent, ToolDescription, load_harmony_encoding, ) from openai_harmony import Message as OpenAIHarmonyMessage from openai_harmony import Role as OpenAIHarmonyRole from vllm import envs from vllm.entrypoints.openai.protocol import ( ChatCompletionToolsParam, ResponseInputOutputItem, ResponsesRequest, ) from vllm.utils import random_uuid REASONING_EFFORT = { "high": ReasoningEffort.HIGH, "medium": ReasoningEffort.MEDIUM, "low": ReasoningEffort.LOW, } _harmony_encoding = None # Builtin tools that should be included in the system message when # they are available and requested by the user. # Tool args are provided by MCP tool descriptions. Output # of the tools are stringified. MCP_BUILTIN_TOOLS: set[str] = { "web_search_preview", "code_interpreter", "container", } def has_custom_tools(tool_types: set[str]) -> bool: """ Checks if the given tool types are custom tools (i.e. any tool other than MCP buildin tools) """ return not tool_types.issubset(MCP_BUILTIN_TOOLS) def get_encoding(): global _harmony_encoding if _harmony_encoding is None: _harmony_encoding = load_harmony_encoding(HarmonyEncodingName.HARMONY_GPT_OSS) return _harmony_encoding def get_system_message( model_identity: str | None = None, reasoning_effort: Literal["high", "medium", "low"] | None = None, start_date: str | None = None, browser_description: str | None = None, python_description: str | None = None, container_description: str | None = None, instructions: str | None = None, with_custom_tools: bool = False, ) -> Message: sys_msg_content = SystemContent.new() if model_identity is not None: sys_msg_content = sys_msg_content.with_model_identity(model_identity) if instructions is not None and envs.VLLM_GPT_OSS_HARMONY_SYSTEM_INSTRUCTIONS: current_identity = sys_msg_content.model_identity new_identity = ( f"{current_identity}\n{instructions}" if current_identity else instructions ) sys_msg_content = sys_msg_content.with_model_identity(new_identity) if reasoning_effort is not None: sys_msg_content = sys_msg_content.with_reasoning_effort( REASONING_EFFORT[reasoning_effort] ) if start_date is None: # NOTE(woosuk): This brings non-determinism in vLLM. Be careful. start_date = datetime.datetime.now().strftime("%Y-%m-%d") sys_msg_content = sys_msg_content.with_conversation_start_date(start_date) if browser_description is not None: sys_msg_content = sys_msg_content.with_tools(browser_description) if python_description is not None: sys_msg_content = sys_msg_content.with_tools(python_description) if container_description is not None: sys_msg_content = sys_msg_content.with_tools(container_description) if not with_custom_tools: channel_config = sys_msg_content.channel_config invalid_channel = "commentary" new_config = ChannelConfig.require_channels( [c for c in channel_config.valid_channels if c != invalid_channel] ) sys_msg_content = sys_msg_content.with_channel_config(new_config) sys_msg = Message.from_role_and_content(Role.SYSTEM, sys_msg_content) return sys_msg def create_tool_definition(tool: ChatCompletionToolsParam | Tool): if isinstance(tool, ChatCompletionToolsParam): return ToolDescription.new( name=tool.function.name, description=tool.function.description, parameters=tool.function.parameters, ) return ToolDescription.new( name=tool.name, description=tool.description, parameters=tool.parameters, ) def get_developer_message( instructions: str | None = None, tools: list[Tool | ChatCompletionToolsParam] | None = None, ) -> Message: dev_msg_content = DeveloperContent.new() if instructions is not None and not envs.VLLM_GPT_OSS_HARMONY_SYSTEM_INSTRUCTIONS: dev_msg_content = dev_msg_content.with_instructions(instructions) if tools is not None: function_tools: list[Tool | ChatCompletionToolsParam] = [] for tool in tools: if tool.type in ( "web_search_preview", "code_interpreter", "container", ): pass elif tool.type == "function": function_tools.append(tool) else: raise ValueError(f"tool type {tool.type} not supported") if function_tools: function_tool_descriptions = [ create_tool_definition(tool) for tool in function_tools ] dev_msg_content = dev_msg_content.with_function_tools( function_tool_descriptions ) dev_msg = Message.from_role_and_content(Role.DEVELOPER, dev_msg_content) return dev_msg def get_user_message(content: str) -> Message: return Message.from_role_and_content(Role.USER, content) def parse_response_input( response_msg: ResponseInputOutputItem, prev_responses: list[ResponseOutputItem | ResponseReasoningItem], ) -> Message: if not isinstance(response_msg, dict): response_msg = response_msg.model_dump() if "type" not in response_msg or response_msg["type"] == "message": role = response_msg["role"] content = response_msg["content"] if role == "system": # User is trying to set a system message. Change it to: # <|start|>developer<|message|># Instructions # {instructions}<|end|> role = "developer" text_prefix = "Instructions:\n" else: text_prefix = "" if isinstance(content, str): msg = Message.from_role_and_content(role, text_prefix + content) else: contents = [TextContent(text=text_prefix + c["text"]) for c in content] msg = Message.from_role_and_contents(role, contents) if role == "assistant": msg = msg.with_channel("final") elif response_msg["type"] == "function_call_output": call_id = response_msg["call_id"] call_response: ResponseFunctionToolCall | None = None for prev_response in reversed(prev_responses): if ( isinstance(prev_response, ResponseFunctionToolCall) and prev_response.call_id == call_id ): call_response = prev_response break if call_response is None: raise ValueError(f"No call message found for {call_id}") msg = Message.from_author_and_content( Author.new(Role.TOOL, f"functions.{call_response.name}"), response_msg["output"], ) elif response_msg["type"] == "reasoning": content = response_msg["content"] assert len(content) == 1 msg = Message.from_role_and_content(Role.ASSISTANT, content[0]["text"]) elif response_msg["type"] == "function_call": msg = Message.from_role_and_content(Role.ASSISTANT, response_msg["arguments"]) msg = msg.with_channel("commentary") msg = msg.with_recipient(f"functions.{response_msg['name']}") msg = msg.with_content_type("json") else: raise ValueError(f"Unknown input type: {response_msg['type']}") return msg def parse_chat_inputs_to_harmony_messages(chat_msgs: list) -> list[Message]: """ Parse a list of messages from request.messages in the Chat Completion API to Harmony messages. """ msgs: list[Message] = [] tool_id_names: dict[str, str] = {} # Collect tool id to name mappings for tool response recipient values for chat_msg in chat_msgs: for tool_call in chat_msg.get("tool_calls", []): tool_id_names[tool_call.get("id")] = tool_call.get("function", {}).get( "name" ) for chat_msg in chat_msgs: msgs.extend(parse_chat_input_to_harmony_message(chat_msg, tool_id_names)) msgs = auto_drop_analysis_messages(msgs) return msgs def auto_drop_analysis_messages(msgs: list[Message]) -> list[Message]: """ Harmony models expect the analysis messages (representing raw chain of thought) to be dropped after an assistant message to the final channel is produced from the reasoning of those messages. The openai-harmony library does this if the very last assistant message is to the final channel, but it does not handle the case where we're in longer multi-turn conversations and the client gave us reasoning content from previous turns of the conversation with multiple assistant messages to the final channel in the conversation. So, we find the index of the last assistant message to the final channel and drop all analysis messages that precede it, leaving only the analysis messages that are relevant to the current part of the conversation. """ last_assistant_final_index = -1 for i in range(len(msgs) - 1, -1, -1): msg = msgs[i] if msg.author.role == "assistant" and msg.channel == "final": last_assistant_final_index = i break cleaned_msgs: list[Message] = [] for i, msg in enumerate(msgs): if i < last_assistant_final_index and msg.channel == "analysis": continue cleaned_msgs.append(msg) return cleaned_msgs def flatten_chat_text_content(content: str | list | None) -> str | None: """ Extract the text parts from a chat message content field and flatten them into a single string. """ if isinstance(content, list): return "".join( item.get("text", "") for item in content if isinstance(item, dict) and item.get("type") == "text" ) return content def parse_chat_input_to_harmony_message( chat_msg, tool_id_names: dict[str, str] | None = None ) -> list[Message]: """ Parse a message from request.messages in the Chat Completion API to Harmony messages. """ tool_id_names = tool_id_names or {} if not isinstance(chat_msg, dict): # Handle Pydantic models chat_msg = chat_msg.model_dump(exclude_none=True) role = chat_msg.get("role") msgs: list[Message] = [] # Assistant message with tool calls tool_calls = chat_msg.get("tool_calls", []) if role == "assistant" and tool_calls: content = flatten_chat_text_content(chat_msg.get("content")) if content: commentary_msg = Message.from_role_and_content(Role.ASSISTANT, content) commentary_msg = commentary_msg.with_channel("commentary") msgs.append(commentary_msg) reasoning_content = chat_msg.get("reasoning") or chat_msg.get( "reasoning_content" ) if reasoning_content: analysis_msg = Message.from_role_and_content( Role.ASSISTANT, reasoning_content ) analysis_msg = analysis_msg.with_channel("analysis") msgs.append(analysis_msg) for call in tool_calls: func = call.get("function", {}) name = func.get("name", "") arguments = func.get("arguments", "") or "" msg = Message.from_role_and_content(Role.ASSISTANT, arguments) msg = msg.with_channel("commentary") msg = msg.with_recipient(f"functions.{name}") # Officially, this should be `<|constrain|>json` but there is not clear # evidence that improves accuracy over `json` and some anecdotes to the # contrary. Further testing of the different content_types is needed. msg = msg.with_content_type("json") msgs.append(msg) return msgs # Tool role message (tool output) if role == "tool": tool_call_id = chat_msg.get("tool_call_id", "") name = tool_id_names.get(tool_call_id, "") content = chat_msg.get("content", "") or "" content = flatten_chat_text_content(content) msg = ( Message.from_author_and_content( Author.new(Role.TOOL, f"functions.{name}"), content ) .with_channel("commentary") .with_recipient("assistant") ) return [msg] # Non-tool reasoning content reasoning_content = chat_msg.get("reasoning") or chat_msg.get("reasoning_content") if role == "assistant" and reasoning_content: analysis_msg = Message.from_role_and_content(Role.ASSISTANT, reasoning_content) analysis_msg = analysis_msg.with_channel("analysis") msgs.append(analysis_msg) # Default: user/assistant/system messages with content content = chat_msg.get("content") or "" if content is None: content = "" if isinstance(content, str): contents = [TextContent(text=content)] else: # TODO: Support refusal. contents = [TextContent(text=c.get("text", "")) for c in content] # Only add assistant messages if they have content, as reasoning or tool calling # assistant messages were already added above. if role == "assistant" and contents and contents[0].text: msg = Message.from_role_and_contents(role, contents) # Send non-tool assistant messages to the final channel msg = msg.with_channel("final") msgs.append(msg) # For user/system/developer messages, add them directly even if no content. elif role != "assistant": msg = Message.from_role_and_contents(role, contents) msgs.append(msg) return msgs def parse_input_to_harmony_message(chat_msg) -> list[Message]: """ Parse a message from request.previous_input_messages in the Responsees API to Harmony messages. """ if not isinstance(chat_msg, dict): # Handle Pydantic models chat_msg = chat_msg.model_dump(exclude_none=True) role = chat_msg.get("role") # Assistant message with tool calls tool_calls = chat_msg.get("tool_calls") if role == "assistant" and tool_calls: msgs: list[Message] = [] for call in tool_calls: func = call.get("function", {}) name = func.get("name", "") arguments = func.get("arguments", "") or "" msg = Message.from_role_and_content(Role.ASSISTANT, arguments) msg = msg.with_channel("commentary") msg = msg.with_recipient(f"functions.{name}") msg = msg.with_content_type("json") msgs.append(msg) return msgs # Tool role message (tool output) if role == "tool": name = chat_msg.get("name", "") content = chat_msg.get("content", "") or "" content = flatten_chat_text_content(content) msg = Message.from_author_and_content( Author.new(Role.TOOL, f"functions.{name}"), content ).with_channel("commentary") return [msg] # Default: user/assistant/system messages with content content = chat_msg.get("content", "") if isinstance(content, str): contents = [TextContent(text=content)] else: # TODO: Support refusal. contents = [TextContent(text=c.get("text", "")) for c in content] msg = Message.from_role_and_contents(role, contents) return [msg] def construct_harmony_previous_input_messages( request: ResponsesRequest, ) -> list[OpenAIHarmonyMessage]: messages: list[OpenAIHarmonyMessage] = [] if request.previous_input_messages: for message in request.previous_input_messages: # Handle both OpenAIHarmonyMessage objects and dictionary inputs if isinstance(message, OpenAIHarmonyMessage): message_role = message.author.role # To match OpenAI, instructions, reasoning and tools are # always taken from the most recent Responses API request # not carried over from previous requests if ( message_role == OpenAIHarmonyRole.SYSTEM or message_role == OpenAIHarmonyRole.DEVELOPER ): continue messages.append(message) else: harmony_messages = parse_input_to_harmony_message(message) for harmony_msg in harmony_messages: message_role = harmony_msg.author.role # To match OpenAI, instructions, reasoning and tools are # always taken from the most recent Responses API request # not carried over from previous requests if ( message_role == OpenAIHarmonyRole.SYSTEM or message_role == OpenAIHarmonyRole.DEVELOPER ): continue messages.append(harmony_msg) return messages def render_for_completion(messages: list[Message]) -> list[int]: conversation = Conversation.from_messages(messages) token_ids = get_encoding().render_conversation_for_completion( conversation, Role.ASSISTANT ) return token_ids def _parse_browser_tool_call(message: Message, recipient: str) -> ResponseOutputItem: """Parse browser tool calls (search, open, find) into web search items.""" if len(message.content) != 1: raise ValueError("Invalid number of contents in browser message") content = message.content[0] # Parse JSON args (with retry detection) try: browser_call = json.loads(content.text) except json.JSONDecodeError: json_retry_output_message = ( f"Invalid JSON args, caught and retried: {content.text}" ) browser_call = { "query": json_retry_output_message, "url": json_retry_output_message, "pattern": json_retry_output_message, } # Create appropriate action based on recipient if recipient == "browser.search": action = ActionSearch( query=f"cursor:{browser_call.get('query', '')}", type="search" ) elif recipient == "browser.open": action = ActionOpenPage( url=f"cursor:{browser_call.get('url', '')}", type="open_page" ) elif recipient == "browser.find": action = ActionFind( pattern=browser_call.get("pattern", ""), url=f"cursor:{browser_call.get('url', '')}", type="find", ) else: raise ValueError(f"Unknown browser action: {recipient}") return ResponseFunctionWebSearch( id=f"ws_{random_uuid()}", action=action, status="completed", type="web_search_call", ) def _parse_function_call(message: Message, recipient: str) -> list[ResponseOutputItem]: """Parse function calls into function tool call items.""" function_name = recipient.split(".")[-1] output_items = [] for content in message.content: random_id = random_uuid() response_item = ResponseFunctionToolCall( arguments=content.text, call_id=f"call_{random_id}", type="function_call", name=function_name, id=f"fc_{random_id}", ) output_items.append(response_item) return output_items def _parse_reasoning_content(message: Message) -> list[ResponseOutputItem]: """Parse reasoning/analysis content into reasoning items.""" output_items = [] for content in message.content: reasoning_item = ResponseReasoningItem( id=f"rs_{random_uuid()}", summary=[], type="reasoning", content=[ ResponseReasoningTextContent(text=content.text, type="reasoning_text") ], status=None, ) output_items.append(reasoning_item) return output_items def _parse_final_message(message: Message) -> ResponseOutputItem: """Parse final channel messages into output message items.""" contents = [] for content in message.content: output_text = ResponseOutputText( text=content.text, annotations=[], # TODO type="output_text", logprobs=None, # TODO ) contents.append(output_text) return ResponseOutputMessage( id=f"msg_{random_uuid()}", content=contents, role=message.author.role, status="completed", type="message", ) def _parse_mcp_recipient(recipient: str) -> tuple[str, str]: """ Parse MCP recipient into (server_label, tool_name). For dotted recipients like "repo_browser.list": - server_label: "repo_browser" (namespace/server) - tool_name: "list" (specific tool) For simple recipients like "filesystem": - server_label: "filesystem" - tool_name: "filesystem" """ if "." in recipient: server_label = recipient.split(".")[0] tool_name = recipient.split(".")[-1] else: server_label = recipient tool_name = recipient return server_label, tool_name def _parse_mcp_call(message: Message, recipient: str) -> list[ResponseOutputItem]: """Parse MCP calls into MCP call items.""" server_label, tool_name = _parse_mcp_recipient(recipient) output_items = [] for content in message.content: response_item = McpCall( arguments=content.text, type="mcp_call", name=tool_name, server_label=server_label, id=f"mcp_{random_uuid()}", status="completed", ) output_items.append(response_item) return output_items def parse_output_message(message: Message) -> list[ResponseOutputItem]: """ Parse a Harmony message into a list of output response items. """ if message.author.role != "assistant": # This is a message from a tool to the assistant (e.g., search result). # Don't include it in the final output for now. This aligns with # OpenAI's behavior on models like o4-mini. return [] output_items: list[ResponseOutputItem] = [] recipient = message.recipient if recipient is not None: # Browser tool calls if recipient.startswith("browser."): output_items.append(_parse_browser_tool_call(message, recipient)) # Function calls (should only happen on commentary channel) elif message.channel == "commentary" and recipient.startswith("functions."): output_items.extend(_parse_function_call(message, recipient)) # Built-in tools are treated as reasoning elif recipient.startswith(("python", "browser", "container")): # Built-in tool recipients (python/browser/container) # generate reasoning output output_items.extend(_parse_reasoning_content(message)) # All other recipients are MCP calls else: output_items.extend(_parse_mcp_call(message, recipient)) # No recipient - handle based on channel for non-tool messages elif message.channel == "analysis": output_items.extend(_parse_reasoning_content(message)) elif message.channel == "commentary": # Per Harmony format, commentary channel can contain preambles to calling # multiple functions - explanatory text with no recipient output_items.extend(_parse_reasoning_content(message)) elif message.channel == "final": output_items.append(_parse_final_message(message)) else: raise ValueError(f"Unknown channel: {message.channel}") return output_items def parse_remaining_state(parser: StreamableParser) -> list[ResponseOutputItem]: if not parser.current_content: return [] if parser.current_role != Role.ASSISTANT: return [] current_recipient = parser.current_recipient if current_recipient is not None and current_recipient.startswith("browser."): return [] if current_recipient and parser.current_channel in ("commentary", "analysis"): if current_recipient.startswith("functions."): rid = random_uuid() return [ ResponseFunctionToolCall( arguments=parser.current_content, call_id=f"call_{rid}", type="function_call", name=current_recipient.split(".")[-1], id=f"fc_{rid}", status="in_progress", ) ] # Built-in tools (python, browser, container) should be treated as reasoning elif not ( current_recipient.startswith("python") or current_recipient.startswith("browser") or current_recipient.startswith("container") ): # All other recipients are MCP calls rid = random_uuid() server_label, tool_name = _parse_mcp_recipient(current_recipient) return [ McpCall( arguments=parser.current_content, type="mcp_call", name=tool_name, server_label=server_label, id=f"mcp_{rid}", status="in_progress", ) ] if parser.current_channel == "commentary": return [ ResponseReasoningItem( id=f"rs_{random_uuid()}", summary=[], type="reasoning", content=[ ResponseReasoningTextContent( text=parser.current_content, type="reasoning_text" ) ], status=None, ) ] if parser.current_channel == "analysis": return [ ResponseReasoningItem( id=f"rs_{random_uuid()}", summary=[], type="reasoning", content=[ ResponseReasoningTextContent( text=parser.current_content, type="reasoning_text" ) ], status=None, ) ] if parser.current_channel == "final": output_text = ResponseOutputText( text=parser.current_content, annotations=[], # TODO type="output_text", logprobs=None, # TODO ) text_item = ResponseOutputMessage( id=f"msg_{random_uuid()}", content=[output_text], role="assistant", # if the parser still has messages (ie if the generator got cut # abruptly), this should be incomplete status="incomplete", type="message", ) return [text_item] return [] def get_stop_tokens_for_assistant_actions() -> list[int]: return get_encoding().stop_tokens_for_assistant_actions() def get_streamable_parser_for_assistant() -> StreamableParser: return StreamableParser(get_encoding(), role=Role.ASSISTANT) def parse_output_into_messages(token_ids: Iterable[int]) -> StreamableParser: parser = get_streamable_parser_for_assistant() for token_id in token_ids: parser.process(token_id) return parser def parse_chat_output( token_ids: Sequence[int], ) -> tuple[str | None, str | None, bool]: """ Parse the output of a Harmony chat completion into reasoning and final content. Note that when the `openai` tool parser is used, serving_chat only uses this for the reasoning content and gets the final content from the tool call parser. When the `openai` tool parser is not enabled, or when `GptOssReasoningParser` is in use,this needs to return the final content without any tool calls parsed. Empty reasoning or final content is returned as None instead of an empty string. """ parser = parse_output_into_messages(token_ids) output_msgs = parser.messages is_tool_call = False # TODO: update this when tool call is supported # Get completed messages from the parser reasoning_texts = [ msg.content[0].text for msg in output_msgs if msg.channel == "analysis" ] final_texts = [ msg.content[0].text for msg in output_msgs if msg.channel != "analysis" ] # Extract partial messages from the parser if parser.current_channel == "analysis" and parser.current_content: reasoning_texts.append(parser.current_content) elif parser.current_channel != "analysis" and parser.current_content: final_texts.append(parser.current_content) # Flatten multiple messages into a single string reasoning: str | None = "\n".join(reasoning_texts) final_content: str | None = "\n".join(final_texts) # Return None instead of empty string since existing callers check for None reasoning = reasoning or None final_content = final_content or None return reasoning, final_content, is_tool_call