From d193316dedd361e56be912ef7868fd3919b05a42 Mon Sep 17 00:00:00 2001 From: Chao Lei Date: Wed, 31 Dec 2025 19:17:08 +0800 Subject: [PATCH] [P/D] Bugfix zmq send/receive failed (#5503) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### What this PR does / why we need it? Currently, when the MooncakeConnector interacts via ZeroMQ, it throws the following exception upon send/receive failure: **Issue 1:** The currently used `zmq.REQ` socket follows a strict request-reply pattern, requiring an alternating sequence of send → receive → send → receive... If either a send() or receive() operation fails, the ZeroMQ socket becomes unusable. **Solution:** When a send() or receive() exception occurs, close and delete the ZeroMQ socket, and recreate it upon next use. **Issue 2:** In `_handle_request`, if `_send_done_recv_signal` raises an exception, the exception is thrown immediately and subsequent code is not executed, causing the decode logic to fail to properly release the request. **Solution:** Move the call to `_send_done_recv_signal` to the end of the function. ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? - vLLM version: v0.13.0 - vLLM main: https://github.com/vllm-project/vllm/commit/45c1ca1ca1ee8fa06df263c8715e8a412ff408d4 Signed-off-by: LCAIZJ --- vllm_ascend/distributed/mooncake_connector.py | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/vllm_ascend/distributed/mooncake_connector.py b/vllm_ascend/distributed/mooncake_connector.py index 6cacef83..603a89b8 100644 --- a/vllm_ascend/distributed/mooncake_connector.py +++ b/vllm_ascend/distributed/mooncake_connector.py @@ -417,6 +417,11 @@ class KVCacheRecvingThread(threading.Thread): f"{request_id}: {e}", exc_info=True) finally: + if all_task_done: + self.task_tracker.update_done_task_count(request_id) + if request_id in self.proc_not_transfer_request: + del self.proc_not_transfer_request[request_id] + self.request_queue.task_done() # Always send the done signal to the remote host to ensure proper # resource cleanup. Failing to do so may cause a memory leak on the # remote host. @@ -425,11 +430,6 @@ class KVCacheRecvingThread(threading.Thread): remote_port_send_num) self._send_done_signal_to_free_remote_port(request_id, remote_host, remote_port_send_num) - if all_task_done: - self.task_tracker.update_done_task_count(request_id) - if request_id in self.proc_not_transfer_request: - del self.proc_not_transfer_request[request_id] - self.request_queue.task_done() def _send_done_signal_to_free_remote_port(self, request_id, remote_host, remote_port_send_num): @@ -698,6 +698,13 @@ class KVCacheRecvingThread(threading.Thread): request_id, remote_host, remote_handshake_port) raise RuntimeError( f"Failed to receive ACK, resp: {resp.decode('utf-8')}") + except RuntimeError as e: + if isinstance(sock, zmq.Socket): # type: ignore + sock.close() + sock = None + logger.warning( + f"Unexpected error occurred in socket, {e}, closing the original channel" + ) finally: if sock is not None: self._return_remote_socket(sock, remote_host,