[P/D] Mooncake connector add zmq socket fail log (#6155)
Mooncake connector add zmq socket fail log
- vLLM version: v0.13.0
- vLLM main:
d68209402d
Signed-off-by: liziyu <liziyu16@huawei.com>
This commit is contained in:
@@ -693,8 +693,8 @@ class KVCacheRecvingThread(threading.Thread):
|
|||||||
sock: Optional[zmq.Socket] = None # type: ignore
|
sock: Optional[zmq.Socket] = None # type: ignore
|
||||||
try:
|
try:
|
||||||
sock = self._get_remote_socket(remote_host, remote_handshake_port)
|
sock = self._get_remote_socket(remote_host, remote_handshake_port)
|
||||||
ensure_zmq_send(sock, self.encoder.encode((GET_META_MSG, "")))
|
ensure_zmq_send(sock, self.encoder.encode((GET_META_MSG, "")), f"{remote_host}:{remote_handshake_port}")
|
||||||
metadata_bytes = ensure_zmq_recv(sock, self.remote_poller)
|
metadata_bytes = ensure_zmq_recv(sock, self.remote_poller, f"{remote_host}:{remote_handshake_port}")
|
||||||
agent_meta = self.decoder.decode(metadata_bytes)
|
agent_meta = self.decoder.decode(metadata_bytes)
|
||||||
engine_id = agent_meta.engine_id
|
engine_id = agent_meta.engine_id
|
||||||
assert engine_id != self.local_engine_id, (
|
assert engine_id != self.local_engine_id, (
|
||||||
@@ -722,9 +722,10 @@ class KVCacheRecvingThread(threading.Thread):
|
|||||||
sock = self._get_remote_socket(remote_host, remote_handshake_port)
|
sock = self._get_remote_socket(remote_host, remote_handshake_port)
|
||||||
data_bytes = self.encoder.encode(
|
data_bytes = self.encoder.encode(
|
||||||
(DONE_RECVING_MSG, request_id, remote_port_send_num))
|
(DONE_RECVING_MSG, request_id, remote_port_send_num))
|
||||||
ensure_zmq_send(sock, data_bytes)
|
ensure_zmq_send(sock, data_bytes, f"{remote_host}:{remote_handshake_port}")
|
||||||
resp = ensure_zmq_recv(sock,
|
resp = ensure_zmq_recv(sock,
|
||||||
self.remote_poller,
|
self.remote_poller,
|
||||||
|
f"{remote_host}:{remote_handshake_port}",
|
||||||
timeout=self.timeout)
|
timeout=self.timeout)
|
||||||
logger.debug(
|
logger.debug(
|
||||||
f"Received response for request {request_id}: {resp.decode('utf-8')}"
|
f"Received response for request {request_id}: {resp.decode('utf-8')}"
|
||||||
@@ -1791,6 +1792,7 @@ def string_to_int64_hash(input_str):
|
|||||||
def ensure_zmq_send(
|
def ensure_zmq_send(
|
||||||
socket: zmq.Socket, # type: ignore
|
socket: zmq.Socket, # type: ignore
|
||||||
data: bytes,
|
data: bytes,
|
||||||
|
path: str,
|
||||||
max_retries: int = 3):
|
max_retries: int = 3):
|
||||||
retries_left = max_retries
|
retries_left = max_retries
|
||||||
while True:
|
while True:
|
||||||
@@ -1806,13 +1808,14 @@ def ensure_zmq_send(
|
|||||||
time.sleep(0.1)
|
time.sleep(0.1)
|
||||||
else:
|
else:
|
||||||
logger.error(f"Send failed after all retries: {e}")
|
logger.error(f"Send failed after all retries: {e}")
|
||||||
raise RuntimeError(f"Failed to send data after {max_retries} "
|
raise RuntimeError(f"Failed to send data to {path} after {max_retries} "
|
||||||
f"retries: {e}")
|
f"retries: {e}")
|
||||||
|
|
||||||
|
|
||||||
def ensure_zmq_recv(
|
def ensure_zmq_recv(
|
||||||
socket: zmq.Socket, # type: ignore
|
socket: zmq.Socket, # type: ignore
|
||||||
poller: zmq.Poller, # type: ignore
|
poller: zmq.Poller, # type: ignore
|
||||||
|
path: str,
|
||||||
timeout: float = 1.0,
|
timeout: float = 1.0,
|
||||||
max_retries: int = 3) -> bytes:
|
max_retries: int = 3) -> bytes:
|
||||||
retries_left = max_retries
|
retries_left = max_retries
|
||||||
@@ -1830,7 +1833,7 @@ def ensure_zmq_recv(
|
|||||||
f"({retries_left} attempts left)")
|
f"({retries_left} attempts left)")
|
||||||
time.sleep(0.1)
|
time.sleep(0.1)
|
||||||
else:
|
else:
|
||||||
logger.error(f"Receive failed after all retries: {e}")
|
logger.error(f"Receive failed from {path} after all retries: {e}")
|
||||||
raise RuntimeError(
|
raise RuntimeError(
|
||||||
f"Failed to receive data after {max_retries} "
|
f"Failed to receive data after {max_retries} "
|
||||||
f"retries: {e}")
|
f"retries: {e}")
|
||||||
|
|||||||
Reference in New Issue
Block a user