[MAIN][BUGFIX] BugFix: Resolve the issue of waiting queue accumulation when requests are canceled. (#2426)

### What this PR does / why we need it?
Resolve the issue of waiting queue accumulation when requests are
canceled.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
By ci


- vLLM version: v0.10.1.1
- vLLM main:
006477e60b

---------

Signed-off-by: wangxiaoteng666 <wangxiaoteng@huawei.com>
This commit is contained in:
wangxiaoteng666
2025-08-29 17:19:23 +08:00
committed by GitHub
parent 52aff9e229
commit ee6d141dd4

View File

@@ -87,9 +87,11 @@
import argparse import argparse
import asyncio import asyncio
import functools
import heapq import heapq
import os import os
import sys import sys
import uuid
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from typing import List from typing import List
@@ -137,7 +139,6 @@ class ProxyState:
] ]
self.req_to_prefiller = {} self.req_to_prefiller = {}
self.req_id_lock = asyncio.Lock() self.req_id_lock = asyncio.Lock()
self.req_id_counter = 0
# Removed selection locks - no longer needed for synchronous methods # Removed selection locks - no longer needed for synchronous methods
# Initialize priority queues for efficient server selection # Initialize priority queues for efficient server selection
@@ -193,8 +194,7 @@ class ProxyState:
async def next_req_id(self): async def next_req_id(self):
async with self.req_id_lock: async with self.req_id_lock:
self.req_id_counter += 1 return str(uuid.uuid4())
return str(self.req_id_counter)
def select_prefiller(self, token_count): # Changed to synchronous def select_prefiller(self, token_count): # Changed to synchronous
# No lock needed - entire function is atomic # No lock needed - entire function is atomic
@@ -313,6 +313,32 @@ async def lifespan(app: FastAPI):
await d.client.aclose() await d.client.aclose()
async def listen_for_disconnect(request: Request) -> None:
"""Return if a disconnect message is received"""
while True:
message = await request.receive()
if message["type"] == "http.disconnect":
break
def with_cancellation(handler_func):
@functools.wraps(handler_func)
async def wrapper(*args, **kwargs):
request = kwargs["request"]
handler_task = asyncio.create_task(handler_func(*args, **kwargs))
cancellation_task = asyncio.create_task(listen_for_disconnect(request))
done, pending = await asyncio.wait([handler_task, cancellation_task],
return_when=asyncio.FIRST_COMPLETED)
for task in pending:
task.cancel()
if handler_task in done:
return handler_task.result()
return None
return wrapper
app = FastAPI(lifespan=lifespan) app = FastAPI(lifespan=lifespan)
@@ -493,11 +519,13 @@ async def _handle_completions(api: str, request: Request):
@app.post("/v1/completions") @app.post("/v1/completions")
@with_cancellation
async def handle_completions(request: Request): async def handle_completions(request: Request):
return await _handle_completions("/completions", request) return await _handle_completions("/completions", request)
@app.post("/v1/chat/completions") @app.post("/v1/chat/completions")
@with_cancellation
async def handle_chat_completions(request: Request): async def handle_chat_completions(request: Request):
return await _handle_completions("/chat/completions", request) return await _handle_completions("/chat/completions", request)