From 64c5907e12b474d5a4ade3982172ede5188405dc Mon Sep 17 00:00:00 2001 From: Shangming Cai Date: Wed, 9 Jul 2025 12:00:34 +0800 Subject: [PATCH] [PD] Add guidance for prefill bootstrap timeout (#7846) Signed-off-by: Shangming Cai --- docs/backend/pd_disaggregation.md | 8 +++- .../srt/disaggregation/mooncake/conn.py | 39 +++++++++++++++++-- 2 files changed, 42 insertions(+), 5 deletions(-) diff --git a/docs/backend/pd_disaggregation.md b/docs/backend/pd_disaggregation.md index c0f8e34d1..43ad0547e 100644 --- a/docs/backend/pd_disaggregation.md +++ b/docs/backend/pd_disaggregation.md @@ -56,13 +56,19 @@ PD Disaggregation with Mooncake supports the following environment variables for |:--------:|:-----------:|:--------: | **`SGLANG_DISAGGREGATION_THREAD_POOL_SIZE`** | Controls the total number of worker threads for KVCache transfer operations per TP rank | A dynamic value calculated by `int(0.75 * os.cpu_count()) // 8)`, which is limited to be larger than 4 and less than 12 to ensure efficiency and prevent thread race conditions | | **`SGLANG_DISAGGREGATION_QUEUE_SIZE`** | Sets the number of parallel transfer queues. KVCache transfer requests from multiple decode instances will be sharded into these queues so that they can share the threads and the transfer bandwidth at the same time. If it is set to `1`, then we transfer requests one by one according to fcfs strategy | `4` | -| **`SGLANG_DISAGGREGATION_BOOTSTRAP_TIMEOUT`** | Timeout (seconds) for receiving destination KV indices during request initialization | `120` | +| **`SGLANG_DISAGGREGATION_BOOTSTRAP_TIMEOUT`** | Timeout (seconds) for receiving destination KV indices during request initialization | `300` | + +If a greater mean TTFT is acceptable, you can `export SGLANG_DISAGGREGATION_BOOTSTRAP_TIMEOUT=600` (10 minutes) to relax the timeout condition. +Please be aware that this setting will cause prefill instances to take a longer time to clean up the affected memory resources when a running decode node loses connection. #### Decode Server Configuration | Variable | Description | Default | |:--------:|:-----------:|:--------: | **`SGLANG_DISAGGREGATION_HEARTBEAT_INTERVAL`** | Interval (seconds) between health checks to prefill bootstrap servers | `5.0` | | **`SGLANG_DISAGGREGATION_HEARTBEAT_MAX_FAILURE`** | Consecutive heartbeat failures before marking prefill server offline | `2` | +| **`SGLANG_DISAGGREGATION_WAITING_TIMEOUT`** | Timeout (seconds) for receiving KV Cache after request initialization | `300` | + +If a greater mean TTFT is acceptable, you can `export SGLANG_DISAGGREGATION_WAITING_TIMEOUT=600` (10 minutes) to relax the timeout condition. ## NIXL diff --git a/python/sglang/srt/disaggregation/mooncake/conn.py b/python/sglang/srt/disaggregation/mooncake/conn.py index 8300a71fb..13458ba64 100644 --- a/python/sglang/srt/disaggregation/mooncake/conn.py +++ b/python/sglang/srt/disaggregation/mooncake/conn.py @@ -185,9 +185,11 @@ class MooncakeKVManager(BaseKVManager): threading.Thread( target=self.transfer_worker, args=(queue, executor), daemon=True ).start() - - self.bootstrap_time_out = get_int_env_var( - "SGLANG_DISAGGREGATION_BOOTSTRAP_TIMEOUT", 120 + # If a timeout happens on the prefill side, it means prefill instances + # fail to receive the KV indices from the decode instance of this request. + # These timeout requests should be aborted to release the tree cache. + self.bootstrap_timeout = get_int_env_var( + "SGLANG_DISAGGREGATION_BOOTSTRAP_TIMEOUT", 300 ) elif self.disaggregation_mode == DisaggregationMode.DECODE: self.heartbeat_failures = {} @@ -209,6 +211,12 @@ class MooncakeKVManager(BaseKVManager): self.connection_pool: Dict[str, Dict[str, Union[str, int]]] = {} self.prefill_tp_size_table: Dict[str, int] = {} self.prefill_dp_size_table: Dict[str, int] = {} + # If a timeout happens on the decode side, it means decode instances + # fail to receive the KV Cache transfer done signal after bootstrapping. + # These timeout requests should be aborted to release the tree cache. + self.waiting_timeout = get_int_env_var( + "SGLANG_DISAGGREGATION_WAITING_TIMEOUT", 300 + ) else: raise ValueError( f"Unsupported DisaggregationMode: {self.disaggregation_mode}" @@ -938,7 +946,12 @@ class MooncakeKVSender(BaseKVSender): if self.init_time is not None: now = time.time() elapsed = now - self.init_time - if elapsed >= self.kv_mgr.bootstrap_time_out: + if elapsed >= self.kv_mgr.bootstrap_timeout: + logger.warning_once( + "Some requests timed out when bootstrapping, " + "which means prefill instances fail to receive the KV indices from the decode instance of this request. " + "If a greater mean TTFT is acceptable, you can 'export SGLANG_DISAGGREGATION_BOOTSTRAP_TIMEOUT=600' (10 minutes) to relax the timeout condition. " + ) self.kv_mgr.record_failure( self.bootstrap_room, f"Request {self.bootstrap_room} timed out after {elapsed:.1f}s in KVPoll.Bootstrapping", @@ -987,6 +1000,7 @@ class MooncakeKVReceiver(BaseKVReceiver): self.session_id = self.kv_mgr.get_session_id() self.kv_mgr.update_status(self.bootstrap_room, KVPoll.Bootstrapping) self.conclude_state = None + self.init_time = None self.data_parallel_rank = data_parallel_rank if self.bootstrap_addr not in self.kv_mgr.prefill_dp_size_table: @@ -1222,14 +1236,31 @@ class MooncakeKVReceiver(BaseKVReceiver): str(self.required_dst_info_num).encode("ascii"), ] ) + self.init_time = time.time() def poll(self) -> KVPoll: if self.conclude_state is None: status = self.kv_mgr.check_status(self.bootstrap_room) if status in (KVPoll.Success, KVPoll.Failed): self.conclude_state = status + elif status == KVPoll.WaitingForInput: + if self.init_time is not None: + now = time.time() + elapsed = now - self.init_time + if elapsed >= self.kv_mgr.waiting_timeout: + logger.warning_once( + "Some requests fail to receive KV Cache transfer done signal after bootstrapping. " + "If a greater mean TTFT is acceptable, you can 'export SGLANG_DISAGGREGATION_WAITING_TIMEOUT=600' (10 minutes) to relax the timeout condition. " + ) + self.kv_mgr.record_failure( + self.bootstrap_room, + f"Request {self.bootstrap_room} timed out after {elapsed:.1f}s in KVPoll.WaitingForInput", + ) + self.conclude_state = KVPoll.Failed + return KVPoll.Failed return status + else: return self.conclude_state