From f66bcdfb2918f63c26e7315d12048fa63a75ec28 Mon Sep 17 00:00:00 2001 From: liziyu <56102866+liziyu179@users.noreply.github.com> Date: Sat, 24 Jan 2026 12:06:42 +0800 Subject: [PATCH] [P/D] Mooncake connector add zmq socket fail log (#6155) Mooncake connector add zmq socket fail log - vLLM version: v0.13.0 - vLLM main: https://github.com/vllm-project/vllm/commit/d68209402ddab3f54a09bc1f4de9a9495a283b60 Signed-off-by: liziyu --- .../kv_transfer/kv_p2p/mooncake_connector.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/vllm_ascend/distributed/kv_transfer/kv_p2p/mooncake_connector.py b/vllm_ascend/distributed/kv_transfer/kv_p2p/mooncake_connector.py index e80717a2..130d1ae1 100644 --- a/vllm_ascend/distributed/kv_transfer/kv_p2p/mooncake_connector.py +++ b/vllm_ascend/distributed/kv_transfer/kv_p2p/mooncake_connector.py @@ -693,8 +693,8 @@ class KVCacheRecvingThread(threading.Thread): sock: Optional[zmq.Socket] = None # type: ignore try: sock = self._get_remote_socket(remote_host, remote_handshake_port) - ensure_zmq_send(sock, self.encoder.encode((GET_META_MSG, ""))) - metadata_bytes = ensure_zmq_recv(sock, self.remote_poller) + ensure_zmq_send(sock, self.encoder.encode((GET_META_MSG, "")), f"{remote_host}:{remote_handshake_port}") + metadata_bytes = ensure_zmq_recv(sock, self.remote_poller, f"{remote_host}:{remote_handshake_port}") agent_meta = self.decoder.decode(metadata_bytes) engine_id = agent_meta.engine_id assert engine_id != self.local_engine_id, ( @@ -722,9 +722,10 @@ class KVCacheRecvingThread(threading.Thread): sock = self._get_remote_socket(remote_host, remote_handshake_port) data_bytes = self.encoder.encode( (DONE_RECVING_MSG, request_id, remote_port_send_num)) - ensure_zmq_send(sock, data_bytes) + ensure_zmq_send(sock, data_bytes, f"{remote_host}:{remote_handshake_port}") resp = ensure_zmq_recv(sock, self.remote_poller, + f"{remote_host}:{remote_handshake_port}", timeout=self.timeout) logger.debug( f"Received response for request {request_id}: {resp.decode('utf-8')}" @@ -1791,6 +1792,7 @@ def string_to_int64_hash(input_str): def ensure_zmq_send( socket: zmq.Socket, # type: ignore data: bytes, + path: str, max_retries: int = 3): retries_left = max_retries while True: @@ -1806,13 +1808,14 @@ def ensure_zmq_send( time.sleep(0.1) else: logger.error(f"Send failed after all retries: {e}") - raise RuntimeError(f"Failed to send data after {max_retries} " + raise RuntimeError(f"Failed to send data to {path} after {max_retries} " f"retries: {e}") def ensure_zmq_recv( socket: zmq.Socket, # type: ignore poller: zmq.Poller, # type: ignore + path: str, timeout: float = 1.0, max_retries: int = 3) -> bytes: retries_left = max_retries @@ -1830,7 +1833,7 @@ def ensure_zmq_recv( f"({retries_left} attempts left)") time.sleep(0.1) else: - logger.error(f"Receive failed after all retries: {e}") + logger.error(f"Receive failed from {path} after all retries: {e}") raise RuntimeError( f"Failed to receive data after {max_retries} " f"retries: {e}")