From dd1012fcbe2a1fb36c44e10c16f8d0bcd8e9da25 Mon Sep 17 00:00:00 2001 From: shangmingc Date: Fri, 6 Jun 2025 01:56:02 +0800 Subject: [PATCH] [PD] Fix potential perf spike caused by tracker gc and optimize doc (#6764) Signed-off-by: Shangming Cai --- docs/backend/pd_disaggregation.md | 4 ++-- .../sglang/srt/disaggregation/mooncake/conn.py | 16 ++++++++-------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/docs/backend/pd_disaggregation.md b/docs/backend/pd_disaggregation.md index 9dbc2705d..833f0b3f9 100644 --- a/docs/backend/pd_disaggregation.md +++ b/docs/backend/pd_disaggregation.md @@ -54,8 +54,8 @@ PD Disaggregation with Mooncake supports the following environment variables for #### Prefill Server Configuration | Variable | Description | Default | |:--------:|:-----------:|:--------: -| **`SGLANG_DISAGGREGATION_THREAD_POOL_SIZE`** | Controls the total number of worker threads for KV transfer operations per TP rank | A dynamic value calculated by `int(0.75 * os.cpu_count()) // 8)`, which is limited to be larger than 4 and less than 12 to ensure efficiency and prevent thread race conditions | -| **`SGLANG_DISAGGREGATION_QUEUE_SIZE`** | Sets the maximum pending tasks in the parallel transfer queue | `4` | +| **`SGLANG_DISAGGREGATION_THREAD_POOL_SIZE`** | Controls the total number of worker threads for KVCache transfer operations per TP rank | A dynamic value calculated by `int(0.75 * os.cpu_count()) // 8)`, which is limited to be larger than 4 and less than 12 to ensure efficiency and prevent thread race conditions | +| **`SGLANG_DISAGGREGATION_QUEUE_SIZE`** | Sets the number of parallel transfer queues. KVCache transfer requests from multiple decode instances will be sharded into these queues so that they can share the threads and the transfer bandwidth at the same time. If it is set to `1`, then we transfer requests one by one according to fcfs strategy | `4` | | **`SGLANG_DISAGGREGATION_BOOTSTRAP_TIMEOUT`** | Timeout (seconds) for receiving destination KV indices during request initialization | `30` | #### Decode Server Configuration diff --git a/python/sglang/srt/disaggregation/mooncake/conn.py b/python/sglang/srt/disaggregation/mooncake/conn.py index 940a25d74..824f76709 100644 --- a/python/sglang/srt/disaggregation/mooncake/conn.py +++ b/python/sglang/srt/disaggregation/mooncake/conn.py @@ -191,7 +191,7 @@ class MooncakeKVManager(BaseKVManager): self.heartbeat_failures = {} self.session_pool = defaultdict(requests.Session) self.session_pool_lock = threading.Lock() - self.addr_to_rooms_tracker = defaultdict(list) + self.addr_to_rooms_tracker = defaultdict(set) self.connection_lock = threading.Lock() # Heartbeat interval should be at least 2 seconds self.heartbeat_interval = max( @@ -504,12 +504,14 @@ class MooncakeKVManager(BaseKVManager): if response.status_code == 200: self.heartbeat_failures[bootstrap_addr] = 0 - for bootstrap_room in self.addr_to_rooms_tracker[ + current_rooms = self.addr_to_rooms_tracker[ bootstrap_addr - ]: - # Remove KVPoll.Success requests from the map + ].copy() + + for bootstrap_room in current_rooms: + # Remove KVPoll.Success requests from the tracker if bootstrap_room not in self.request_status: - self.addr_to_rooms_tracker[bootstrap_addr].remove( + self.addr_to_rooms_tracker[bootstrap_addr].discard( bootstrap_room ) else: @@ -879,9 +881,7 @@ class MooncakeKVReceiver(BaseKVReceiver): self.bootstrap_infos = self.kv_mgr.connection_pool[bootstrap_key] assert len(self.bootstrap_infos) > 0 - self.kv_mgr.addr_to_rooms_tracker[self.bootstrap_addr].append( - self.bootstrap_room - ) + self.kv_mgr.addr_to_rooms_tracker[self.bootstrap_addr].add(self.bootstrap_room) self.kv_mgr.update_status(self.bootstrap_room, KVPoll.WaitingForInput) def _get_bootstrap_info_from_server(self, engine_rank, target_dp_group):