[PD] Add guidance for prefill bootstrap timeout (#7846)
Signed-off-by: Shangming Cai <caishangming@linux.alibaba.com>
This commit is contained in:
@@ -185,9 +185,11 @@ class MooncakeKVManager(BaseKVManager):
|
||||
threading.Thread(
|
||||
target=self.transfer_worker, args=(queue, executor), daemon=True
|
||||
).start()
|
||||
|
||||
self.bootstrap_time_out = get_int_env_var(
|
||||
"SGLANG_DISAGGREGATION_BOOTSTRAP_TIMEOUT", 120
|
||||
# If a timeout happens on the prefill side, it means prefill instances
|
||||
# fail to receive the KV indices from the decode instance of this request.
|
||||
# These timeout requests should be aborted to release the tree cache.
|
||||
self.bootstrap_timeout = get_int_env_var(
|
||||
"SGLANG_DISAGGREGATION_BOOTSTRAP_TIMEOUT", 300
|
||||
)
|
||||
elif self.disaggregation_mode == DisaggregationMode.DECODE:
|
||||
self.heartbeat_failures = {}
|
||||
@@ -209,6 +211,12 @@ class MooncakeKVManager(BaseKVManager):
|
||||
self.connection_pool: Dict[str, Dict[str, Union[str, int]]] = {}
|
||||
self.prefill_tp_size_table: Dict[str, int] = {}
|
||||
self.prefill_dp_size_table: Dict[str, int] = {}
|
||||
# If a timeout happens on the decode side, it means decode instances
|
||||
# fail to receive the KV Cache transfer done signal after bootstrapping.
|
||||
# These timeout requests should be aborted to release the tree cache.
|
||||
self.waiting_timeout = get_int_env_var(
|
||||
"SGLANG_DISAGGREGATION_WAITING_TIMEOUT", 300
|
||||
)
|
||||
else:
|
||||
raise ValueError(
|
||||
f"Unsupported DisaggregationMode: {self.disaggregation_mode}"
|
||||
@@ -938,7 +946,12 @@ class MooncakeKVSender(BaseKVSender):
|
||||
if self.init_time is not None:
|
||||
now = time.time()
|
||||
elapsed = now - self.init_time
|
||||
if elapsed >= self.kv_mgr.bootstrap_time_out:
|
||||
if elapsed >= self.kv_mgr.bootstrap_timeout:
|
||||
logger.warning_once(
|
||||
"Some requests timed out when bootstrapping, "
|
||||
"which means prefill instances fail to receive the KV indices from the decode instance of this request. "
|
||||
"If a greater mean TTFT is acceptable, you can 'export SGLANG_DISAGGREGATION_BOOTSTRAP_TIMEOUT=600' (10 minutes) to relax the timeout condition. "
|
||||
)
|
||||
self.kv_mgr.record_failure(
|
||||
self.bootstrap_room,
|
||||
f"Request {self.bootstrap_room} timed out after {elapsed:.1f}s in KVPoll.Bootstrapping",
|
||||
@@ -987,6 +1000,7 @@ class MooncakeKVReceiver(BaseKVReceiver):
|
||||
self.session_id = self.kv_mgr.get_session_id()
|
||||
self.kv_mgr.update_status(self.bootstrap_room, KVPoll.Bootstrapping)
|
||||
self.conclude_state = None
|
||||
self.init_time = None
|
||||
self.data_parallel_rank = data_parallel_rank
|
||||
|
||||
if self.bootstrap_addr not in self.kv_mgr.prefill_dp_size_table:
|
||||
@@ -1222,14 +1236,31 @@ class MooncakeKVReceiver(BaseKVReceiver):
|
||||
str(self.required_dst_info_num).encode("ascii"),
|
||||
]
|
||||
)
|
||||
self.init_time = time.time()
|
||||
|
||||
def poll(self) -> KVPoll:
|
||||
if self.conclude_state is None:
|
||||
status = self.kv_mgr.check_status(self.bootstrap_room)
|
||||
if status in (KVPoll.Success, KVPoll.Failed):
|
||||
self.conclude_state = status
|
||||
elif status == KVPoll.WaitingForInput:
|
||||
if self.init_time is not None:
|
||||
now = time.time()
|
||||
elapsed = now - self.init_time
|
||||
if elapsed >= self.kv_mgr.waiting_timeout:
|
||||
logger.warning_once(
|
||||
"Some requests fail to receive KV Cache transfer done signal after bootstrapping. "
|
||||
"If a greater mean TTFT is acceptable, you can 'export SGLANG_DISAGGREGATION_WAITING_TIMEOUT=600' (10 minutes) to relax the timeout condition. "
|
||||
)
|
||||
self.kv_mgr.record_failure(
|
||||
self.bootstrap_room,
|
||||
f"Request {self.bootstrap_room} timed out after {elapsed:.1f}s in KVPoll.WaitingForInput",
|
||||
)
|
||||
self.conclude_state = KVPoll.Failed
|
||||
return KVPoll.Failed
|
||||
|
||||
return status
|
||||
|
||||
else:
|
||||
return self.conclude_state
|
||||
|
||||
|
||||
Reference in New Issue
Block a user