[P/D] Bugfix zmq send/receive failed (#5503)

### What this PR does / why we need it?
Currently, when the MooncakeConnector interacts via ZeroMQ, it throws
the following exception upon send/receive failure:
**Issue 1:** The currently used `zmq.REQ` socket follows a strict
request-reply pattern, requiring an alternating sequence of send →
receive → send → receive... If either a send() or receive() operation
fails, the ZeroMQ socket becomes unusable.
**Solution:** When a send() or receive() exception occurs, close and
delete the ZeroMQ socket, and recreate it upon next use.

**Issue 2:** In `_handle_request`, if `_send_done_recv_signal` raises an
exception, the exception is thrown immediately and subsequent code is
not executed, causing the decode logic to fail to properly release the
request.
**Solution:** Move the call to `_send_done_recv_signal` to the end of
the function.

### Does this PR introduce _any_ user-facing change?

### How was this patch tested?

- vLLM version: v0.13.0
- vLLM main:
45c1ca1ca1

Signed-off-by: LCAIZJ <leichao139636@163.com>
This commit is contained in:
Chao Lei
2025-12-31 19:17:08 +08:00
committed by GitHub
parent 80fc0f5b9e
commit d193316ded

View File

@@ -417,6 +417,11 @@ class KVCacheRecvingThread(threading.Thread):
f"{request_id}: {e}", f"{request_id}: {e}",
exc_info=True) exc_info=True)
finally: finally:
if all_task_done:
self.task_tracker.update_done_task_count(request_id)
if request_id in self.proc_not_transfer_request:
del self.proc_not_transfer_request[request_id]
self.request_queue.task_done()
# Always send the done signal to the remote host to ensure proper # Always send the done signal to the remote host to ensure proper
# resource cleanup. Failing to do so may cause a memory leak on the # resource cleanup. Failing to do so may cause a memory leak on the
# remote host. # remote host.
@@ -425,11 +430,6 @@ class KVCacheRecvingThread(threading.Thread):
remote_port_send_num) remote_port_send_num)
self._send_done_signal_to_free_remote_port(request_id, remote_host, self._send_done_signal_to_free_remote_port(request_id, remote_host,
remote_port_send_num) remote_port_send_num)
if all_task_done:
self.task_tracker.update_done_task_count(request_id)
if request_id in self.proc_not_transfer_request:
del self.proc_not_transfer_request[request_id]
self.request_queue.task_done()
def _send_done_signal_to_free_remote_port(self, request_id, remote_host, def _send_done_signal_to_free_remote_port(self, request_id, remote_host,
remote_port_send_num): remote_port_send_num):
@@ -698,6 +698,13 @@ class KVCacheRecvingThread(threading.Thread):
request_id, remote_host, remote_handshake_port) request_id, remote_host, remote_handshake_port)
raise RuntimeError( raise RuntimeError(
f"Failed to receive ACK, resp: {resp.decode('utf-8')}") f"Failed to receive ACK, resp: {resp.decode('utf-8')}")
except RuntimeError as e:
if isinstance(sock, zmq.Socket): # type: ignore
sock.close()
sock = None
logger.warning(
f"Unexpected error occurred in socket, {e}, closing the original channel"
)
finally: finally:
if sock is not None: if sock is not None:
self._return_remote_socket(sock, remote_host, self._return_remote_socket(sock, remote_host,