From b1c8d4e9f31953560f2db45a3b6e68099ef00c13 Mon Sep 17 00:00:00 2001 From: shangmingc Date: Wed, 28 May 2025 15:40:54 +0800 Subject: [PATCH] [PD] Abort unbootstrapped prefill requests through timeout (#6685) Signed-off-by: Shangming Cai --- .../srt/disaggregation/mooncake/conn.py | 28 +++++++++++++++---- 1 file changed, 23 insertions(+), 5 deletions(-) diff --git a/python/sglang/srt/disaggregation/mooncake/conn.py b/python/sglang/srt/disaggregation/mooncake/conn.py index a03df0700..8ab5066ec 100644 --- a/python/sglang/srt/disaggregation/mooncake/conn.py +++ b/python/sglang/srt/disaggregation/mooncake/conn.py @@ -168,6 +168,9 @@ class MooncakeKVManager(BaseKVManager): min(max(1, cpu_count // 8), 8), ) ) + self.bootstrap_time_out = get_int_env_var( + "SGLANG_DISAGGREGATION_BOOTSTRAP_TIMEOUT", 30 + ) elif self.disaggregation_mode == DisaggregationMode.DECODE: self.heartbeat_failures = {} self.session_pool = defaultdict(requests.Session) @@ -351,7 +354,7 @@ class MooncakeKVManager(BaseKVManager): if req.mooncake_session_id in self.failed_sessions: self.record_failure( kv_chunk.room, - f"Decode instance could be dead, {req.mooncake_session_id} failed due to multiple errors", + f"Decode instance could be dead, remote mooncake session {req.mooncake_session_id} is not alive", ) self.update_status(kv_chunk.room, KVPoll.Failed) self.sync_status_to_decode_endpoint( @@ -447,7 +450,8 @@ class MooncakeKVManager(BaseKVManager): kv_chunk.room not in self.request_status or self.check_status(kv_chunk.room) == KVPoll.Success ): - self.transfer_infos.pop(kv_chunk.room) + if kv_chunk.room in self.transfer_infos: + self.transfer_infos.pop(kv_chunk.room) except queue.Empty: continue @@ -630,7 +634,8 @@ class MooncakeKVManager(BaseKVManager): possible_affected_rooms = self.addr_to_rooms_tracker.get( failed_bootstrap_addr, [] ) - del self.addr_to_rooms_tracker[failed_bootstrap_addr] + if failed_bootstrap_addr in self.addr_to_rooms_tracker: + del self.addr_to_rooms_tracker[failed_bootstrap_addr] # Report the requests associated with the failed bootstrap addr and mark their status as KVPoll.Failed affected_rooms = [] @@ -660,6 +665,7 @@ class MooncakeKVSender(BaseKVSender): self.kv_mgr.update_status(bootstrap_room, KVPoll.Bootstrapping) self.aux_index = None self.bootstrap_server_url = bootstrap_addr + self.init_time = time.time() self.conclude_state = None # inner state self.curr_idx = 0 @@ -694,13 +700,24 @@ class MooncakeKVSender(BaseKVSender): status = self.kv_mgr.check_status(self.bootstrap_room) if status in (KVPoll.Success, KVPoll.Failed): self.conclude_state = status + elif status == KVPoll.Bootstrapping: + now = time.time() + elapsed = now - self.init_time + if elapsed >= self.kv_mgr.bootstrap_time_out: + self.kv_mgr.record_failure( + self.bootstrap_room, + f"Request {self.bootstrap_room} timed out after {elapsed:.1f}s in KVPoll.Bootstrapping", + ) + self.conclude_state = KVPoll.Failed + return KVPoll.Failed return status else: return self.conclude_state def clear(self) -> None: - self.kv_mgr.request_status.pop(self.bootstrap_room) + if self.bootstrap_room in self.kv_mgr.request_status: + self.kv_mgr.request_status.pop(self.bootstrap_room) def failure_exception(self): self.clear() @@ -956,7 +973,8 @@ class MooncakeKVReceiver(BaseKVReceiver): return self.conclude_state def clear(self) -> None: - self.kv_mgr.request_status.pop(self.bootstrap_room) + if self.bootstrap_room in self.kv_mgr.request_status: + self.kv_mgr.request_status.pop(self.bootstrap_room) def failure_exception(self): self.clear()