Refine OpenAI serving entrypoint to remove batch requests (#7372)

Signed-off-by: Xinyuan Tong <justinning0323@outlook.com>
Co-authored-by: Chang Su <csu272@usc.edu>
This commit is contained in:
Xinyuan Tong
2025-06-20 14:33:43 -07:00
committed by GitHub
parent 794be55af2
commit 0998808009
8 changed files with 488 additions and 645 deletions

View File

@@ -3,7 +3,7 @@ import json
import logging
import time
import uuid
from typing import Any, Dict, List, Optional, Union
from typing import Any, AsyncGenerator, Dict, List, Optional, Union
from fastapi import Request
from fastapi.responses import StreamingResponse
@@ -52,137 +52,56 @@ class OpenAIServingChat(OpenAIServingBase):
def _request_id_prefix(self) -> str:
return "chatcmpl-"
def _validate_request(self, request: ChatCompletionRequest) -> Optional[str]:
"""Validate chat messages format and content"""
if not (messages := request.messages):
return "Messages cannot be empty"
# Check for alternating user/assistant pattern (optional validation)
roles = [msg.role for msg in messages]
# First message should typically be from user or system
if roles[0] not in ["user", "system"]:
return "First message should be from 'user' or 'system'"
# Check for consecutive assistant messages (which might indicate an error)
for i in range(1, len(roles)):
if roles[i] == "assistant" and roles[i - 1] == "assistant":
# This is actually allowed in some cases, so just warn
pass
# Validate message content
for i, msg in enumerate(messages):
if msg.role == "user":
if not msg.content:
return f"User message at index {i} has no content"
elif msg.role == "assistant":
# Assistant messages can have no content if they have tool_calls
if not msg.content and not getattr(msg, "tool_calls", None):
return (
f"Assistant message at index {i} has no content or tool calls"
)
return None
def _convert_to_internal_request(
self,
all_requests: List[ChatCompletionRequest],
request_ids: List[str],
) -> tuple[
GenerateReqInput, Union[ChatCompletionRequest, List[ChatCompletionRequest]]
]:
request: ChatCompletionRequest,
) -> tuple[GenerateReqInput, ChatCompletionRequest]:
"""Convert OpenAI chat completion request to internal format"""
input_ids = []
prompts = []
sampling_params_list = []
image_data_list = []
audio_data_list = []
return_logprobs = []
logprob_start_lens = []
top_logprobs_nums = []
modalities_list = []
lora_paths = []
is_multimodal = self.tokenizer_manager.model_config.is_multimodal
for request in all_requests:
# Process messages and apply chat template
(
prompt,
prompt_ids,
image_data,
audio_data,
modalities,
stop,
tool_call_constraint,
) = self._process_messages(request, is_multimodal)
# Process messages and apply chat template
(
prompt,
prompt_ids,
image_data,
audio_data,
modalities,
stop,
tool_call_constraint,
) = self._process_messages(request, is_multimodal)
input_ids.append(prompt_ids)
prompts.append(prompt)
return_logprobs.append(request.logprobs)
logprob_start_lens.append(-1)
top_logprobs_nums.append(request.top_logprobs or 0)
lora_paths.append(request.lora_path)
# Build sampling parameters
sampling_params = self._build_sampling_params(
request, stop, tool_call_constraint
)
sampling_params_list.append(sampling_params)
image_data_list.append(image_data)
audio_data_list.append(audio_data)
modalities_list.append(modalities)
# Build sampling parameters
sampling_params = self._build_sampling_params(
request, stop, tool_call_constraint
)
# Handle single vs multiple requests
if len(all_requests) == 1:
if is_multimodal:
prompt_kwargs = {"text": prompts[0]}
else:
if isinstance(input_ids[0], str):
prompt_kwargs = {"text": input_ids[0]}
else:
prompt_kwargs = {"input_ids": input_ids[0]}
sampling_params_list = sampling_params_list[0]
image_data_list = image_data_list[0]
audio_data_list = audio_data_list[0]
return_logprobs = return_logprobs[0]
logprob_start_lens = logprob_start_lens[0]
top_logprobs_nums = top_logprobs_nums[0]
modalities_list = modalities_list[0]
lora_paths = lora_paths[0]
request_ids = request_ids[0]
if is_multimodal:
prompt_kwargs = {"text": prompt}
else:
if is_multimodal:
prompt_kwargs = {"text": prompts}
if isinstance(prompt_ids, str):
prompt_kwargs = {"text": prompt_ids}
else:
if isinstance(input_ids[0], str):
prompt_kwargs = {"text": input_ids}
else:
prompt_kwargs = {"input_ids": input_ids}
prompt_kwargs = {"input_ids": prompt_ids}
adapted_request = GenerateReqInput(
**prompt_kwargs,
image_data=image_data_list,
audio_data=audio_data_list,
sampling_params=sampling_params_list,
return_logprob=return_logprobs,
logprob_start_len=logprob_start_lens,
top_logprobs_num=top_logprobs_nums,
stream=all_requests[0].stream,
image_data=image_data,
audio_data=audio_data,
sampling_params=sampling_params,
return_logprob=request.logprobs,
logprob_start_len=-1,
top_logprobs_num=request.top_logprobs or 0,
stream=request.stream,
return_text_in_logprobs=True,
rid=request_ids,
modalities=modalities_list,
lora_path=lora_paths,
bootstrap_host=all_requests[0].bootstrap_host,
bootstrap_port=all_requests[0].bootstrap_port,
bootstrap_room=all_requests[0].bootstrap_room,
modalities=modalities,
lora_path=request.lora_path,
bootstrap_host=request.bootstrap_host,
bootstrap_port=request.bootstrap_port,
bootstrap_room=request.bootstrap_room,
)
return adapted_request, (
all_requests if len(all_requests) > 1 else all_requests[0]
)
return adapted_request, request
def _process_messages(
self, request: ChatCompletionRequest, is_multimodal: bool
@@ -457,55 +376,138 @@ class OpenAIServingChat(OpenAIServingBase):
raw_request: Request,
) -> StreamingResponse:
"""Handle streaming chat completion request"""
return StreamingResponse(
self._generate_chat_stream(adapted_request, request, raw_request),
media_type="text/event-stream",
background=self.tokenizer_manager.create_abort_task(adapted_request),
)
async def generate_stream_resp():
parser_dict = {}
reasoning_parser_dict = {}
tool_call_first = True
is_firsts = {}
stream_buffers = {}
n_prev_tokens = {}
prompt_tokens = {}
completion_tokens = {}
cached_tokens = {}
async def _generate_chat_stream(
self,
adapted_request: GenerateReqInput,
request: ChatCompletionRequest,
raw_request: Request,
) -> AsyncGenerator[str, None]:
"""Generate streaming chat completion response"""
# Parsers for tool calls and reasoning
parser_dict = {}
reasoning_parser_dict = {}
try:
async for content in self.tokenizer_manager.generate_request(
adapted_request, raw_request
):
index = content.get("index", 0)
# State tracking for streaming
is_firsts = {}
stream_buffers = {}
n_prev_tokens = {}
is_first = is_firsts.get(index, True)
stream_buffer = stream_buffers.get(index, "")
n_prev_token = n_prev_tokens.get(index, 0)
# Usage tracking
prompt_tokens = {}
completion_tokens = {}
cached_tokens = {}
prompt_tokens[index] = content["meta_info"]["prompt_tokens"]
completion_tokens[index] = content["meta_info"]["completion_tokens"]
cached_tokens[index] = content["meta_info"].get("cached_tokens", 0)
try:
async for content in self.tokenizer_manager.generate_request(
adapted_request, raw_request
):
index = content.get("index", 0)
# Handle logprobs
choice_logprobs = None
if request.logprobs:
choice_logprobs = self._process_streaming_logprobs(
content, n_prev_token
)
n_prev_token = len(
content["meta_info"]["output_token_logprobs"]
)
prompt_tokens[index] = content["meta_info"]["prompt_tokens"]
completion_tokens[index] = content["meta_info"]["completion_tokens"]
cached_tokens[index] = content["meta_info"].get("cached_tokens", 0)
finish_reason = content["meta_info"]["finish_reason"]
finish_reason_type = (
finish_reason["type"] if finish_reason else None
# Handle logprobs
choice_logprobs = None
if request.logprobs:
choice_logprobs = self._process_streaming_logprobs(
content, n_prev_tokens.get(index, 0)
)
n_prev_tokens[index] = len(
content["meta_info"]["output_token_logprobs"]
)
# First chunk with role
if is_first:
is_first = False
delta = DeltaMessage(role="assistant")
finish_reason = content["meta_info"]["finish_reason"]
finish_reason_type = finish_reason["type"] if finish_reason else None
# First chunk with role
if is_firsts.get(index, True):
is_firsts[index] = False
delta = DeltaMessage(role="assistant", content="")
choice_data = ChatCompletionResponseStreamChoice(
index=index,
delta=delta,
finish_reason=finish_reason_type,
matched_stop=(
finish_reason["matched"]
if finish_reason and "matched" in finish_reason
else None
),
logprobs=choice_logprobs,
)
chunk = ChatCompletionStreamResponse(
id=content["meta_info"]["id"],
created=int(time.time()),
choices=[choice_data],
model=request.model,
)
yield f"data: {chunk.model_dump_json()}\n\n"
# Process content delta
stream_buffer = stream_buffers.get(index, "")
delta = content["text"][len(stream_buffer) :]
stream_buffers[index] = stream_buffer + delta
# Handle reasoning content
enable_thinking = getattr(request, "chat_template_kwargs", {}).get(
"enable_thinking", True
)
if (
self.tokenizer_manager.server_args.reasoning_parser
and request.separate_reasoning
and enable_thinking
):
reasoning_text, delta = self._process_reasoning_stream(
index, delta, reasoning_parser_dict, content, request
)
if reasoning_text:
choice_data = ChatCompletionResponseStreamChoice(
index=index,
delta=delta,
delta=DeltaMessage(reasoning_content=reasoning_text),
finish_reason=finish_reason_type,
)
chunk = ChatCompletionStreamResponse(
id=content["meta_info"]["id"],
created=int(time.time()),
choices=[choice_data],
model=request.model,
)
yield f"data: {chunk.model_dump_json()}\n\n"
if not delta:
continue
# Handle tool calls
if request.tool_choice != "none" and request.tools:
async for chunk in self._process_tool_call_stream(
index,
delta,
parser_dict,
content,
request,
finish_reason_type,
):
yield chunk
else:
# Regular content
if delta or not (
request.stream_options and request.stream_options.include_usage
):
choice_data = ChatCompletionResponseStreamChoice(
index=index,
delta=DeltaMessage(content=delta if delta else None),
finish_reason=(
None
if request.stream_options
and request.stream_options.include_usage
else finish_reason_type
),
matched_stop=(
finish_reason["matched"]
if finish_reason and "matched" in finish_reason
@@ -521,121 +523,49 @@ class OpenAIServingChat(OpenAIServingBase):
)
yield f"data: {chunk.model_dump_json()}\n\n"
# Process content delta
delta = content["text"][len(stream_buffer) :]
new_stream_buffer = stream_buffer + delta
# Handle reasoning content
enable_thinking = getattr(request, "chat_template_kwargs", {}).get(
"enable_thinking", True
# Final chunk with finish_reason
finish_reason_chunk = ChatCompletionStreamResponse(
id=content["meta_info"]["id"],
created=int(time.time()),
choices=[
ChatCompletionResponseStreamChoice(
index=index,
delta=DeltaMessage(),
finish_reason=finish_reason_type,
matched_stop=(
finish_reason["matched"]
if finish_reason and "matched" in finish_reason
else None
),
)
if (
self.tokenizer_manager.server_args.reasoning_parser
and request.separate_reasoning
and enable_thinking
):
reasoning_text, delta = self._process_reasoning_stream(
index, delta, reasoning_parser_dict, content, request
)
if reasoning_text:
choice_data = ChatCompletionResponseStreamChoice(
index=index,
delta=DeltaMessage(reasoning_content=reasoning_text),
finish_reason=finish_reason_type,
)
chunk = ChatCompletionStreamResponse(
id=content["meta_info"]["id"],
created=int(time.time()),
choices=[choice_data],
model=request.model,
)
yield f"data: {chunk.model_dump_json()}\n\n"
],
model=request.model,
usage=None,
)
yield f"data: {finish_reason_chunk.model_dump_json()}\n\n"
if not delta:
stream_buffers[index] = new_stream_buffer
is_firsts[index] = is_first
n_prev_tokens[index] = n_prev_token
continue
# Handle tool calls
if request.tool_choice != "none" and request.tools:
async for chunk in self._process_tool_call_stream(
index,
delta,
parser_dict,
content,
request,
finish_reason_type,
):
yield chunk
else:
# Regular content
if delta or not (
request.stream_options
and request.stream_options.include_usage
):
choice_data = ChatCompletionResponseStreamChoice(
index=index,
delta=DeltaMessage(content=delta if delta else None),
finish_reason=(
None
if request.stream_options
and request.stream_options.include_usage
else finish_reason_type
),
matched_stop=(
finish_reason["matched"]
if finish_reason and "matched" in finish_reason
else None
),
logprobs=choice_logprobs,
)
chunk = ChatCompletionStreamResponse(
id=content["meta_info"]["id"],
created=int(time.time()),
choices=[choice_data],
model=request.model,
)
yield f"data: {chunk.model_dump_json()}\n\n"
stream_buffers[index] = new_stream_buffer
is_firsts[index] = is_first
n_prev_tokens[index] = n_prev_token
# Final chunk with usage
if request.stream_options and request.stream_options.include_usage:
usage = self._calculate_streaming_usage_base(
prompt_tokens, completion_tokens, cached_tokens, request.n
)
else:
usage = None
final_chunk = ChatCompletionStreamResponse(
# Additional usage chunk
if request.stream_options and request.stream_options.include_usage:
usage = self._calculate_streaming_usage_base(
prompt_tokens,
completion_tokens,
cached_tokens,
request.n,
)
usage_chunk = ChatCompletionStreamResponse(
id=content["meta_info"]["id"],
created=int(time.time()),
choices=[
ChatCompletionResponseStreamChoice(
index=index,
delta=DeltaMessage(),
finish_reason=finish_reason_type,
)
],
choices=[], # Empty choices array as per OpenAI spec
model=request.model,
usage=usage,
)
yield f"data: {final_chunk.model_dump_json()}\n\n"
yield f"data: {usage_chunk.model_dump_json()}\n\n"
except Exception as e:
error = self.create_streaming_error_response(str(e))
yield f"data: {error}\n\n"
except Exception as e:
error = self.create_streaming_error_response(str(e))
yield f"data: {error}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(
generate_stream_resp(),
media_type="text/event-stream",
background=self.tokenizer_manager.create_abort_task(adapted_request),
)
yield "data: [DONE]\n\n"
async def _handle_non_streaming_request(
self,
@@ -658,9 +588,6 @@ class OpenAIServingChat(OpenAIServingBase):
request,
ret,
int(time.time()),
cache_report=self.tokenizer_manager.server_args.enable_cache_report,
tool_call_parser=self.tokenizer_manager.server_args.tool_call_parser,
reasoning_parser=self.tokenizer_manager.server_args.reasoning_parser,
)
return response
@@ -670,9 +597,6 @@ class OpenAIServingChat(OpenAIServingBase):
request: ChatCompletionRequest,
ret: List[Dict[str, Any]],
created: int,
cache_report: bool = False,
tool_call_parser: Optional[str] = None,
reasoning_parser: Optional[str] = None,
) -> ChatCompletionResponse:
"""Build chat completion response from generation results"""
choices = []
@@ -691,6 +615,7 @@ class OpenAIServingChat(OpenAIServingBase):
enable_thinking = getattr(request, "chat_template_kwargs", {}).get(
"enable_thinking", True
)
reasoning_parser = self.tokenizer_manager.server_args.reasoning_parser
if reasoning_parser and request.separate_reasoning and enable_thinking:
try:
parser = ReasoningParser(
@@ -708,6 +633,7 @@ class OpenAIServingChat(OpenAIServingBase):
# Handle tool calls
tool_calls = None
if request.tool_choice != "none" and request.tools:
tool_call_parser = self.tokenizer_manager.server_args.tool_call_parser
tool_calls, text, finish_reason = self._process_tool_calls(
text, request.tools, tool_call_parser, finish_reason
)
@@ -731,6 +657,7 @@ class OpenAIServingChat(OpenAIServingBase):
choices.append(choice_data)
# Calculate usage
cache_report = self.tokenizer_manager.server_args.enable_cache_report
usage = aggregate_token_usage(ret, request.n, cache_report)
return ChatCompletionResponse(
@@ -810,7 +737,7 @@ class OpenAIServingChat(OpenAIServingBase):
text, call_info_list = parser.parse_non_stream(text)
tool_calls = [
ToolCall(
id=f"call_{base64.urlsafe_b64encode(uuid.uuid4().bytes).rstrip(b'=').decode()}",
id=f"call_{uuid.uuid4().hex[:24]}",
function=FunctionResponse(
name=call_info.name, arguments=call_info.parameters
),
@@ -894,6 +821,16 @@ class OpenAIServingChat(OpenAIServingBase):
# Yield tool calls
for call_item in calls:
# Tool call ID should be generated only once per tool call
if call_item.name:
# First chunk: include ID and function name
tool_call_id = f"call_{uuid.uuid4().hex[:24]}"
function_name = call_item.name
else:
# Subsequent chunks: null ID and name for argument deltas
tool_call_id = None
function_name = None
if finish_reason_type == "stop":
# Handle remaining arguments
latest_delta_len = 0
@@ -912,10 +849,10 @@ class OpenAIServingChat(OpenAIServingBase):
finish_reason_type = "tool_calls"
tool_call = ToolCall(
id=f"call_{base64.urlsafe_b64encode(uuid.uuid4().bytes).rstrip(b'=').decode()}",
id=tool_call_id,
index=call_item.tool_index,
function=FunctionResponse(
name=call_item.name,
name=function_name,
arguments=call_item.parameters,
),
)