From 8b98d7a4e8be13240dfb2c02e462c64ffaaa9101 Mon Sep 17 00:00:00 2001 From: fems14 <74094523+fems14@users.noreply.github.com> Date: Tue, 20 Jan 2026 22:56:04 +0800 Subject: [PATCH] =?UTF-8?q?=E3=80=90main=E3=80=91=E3=80=90bugfix=E3=80=91R?= =?UTF-8?q?esolved=20memory=20deallocation=20failure=20in=20the=20pooling?= =?UTF-8?q?=20layer=20under=20re-computation=20workloads.=20(#6045)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### What this PR does / why we need it? Resolved a double-free memory vulnerability in the pooling layer under re-computation scenarios. ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? - vLLM version: v0.13.0 - vLLM main: https://github.com/vllm-project/vllm/commit/d68209402ddab3f54a09bc1f4de9a9495a283b60 Signed-off-by: fems14 <1804143737@qq.com> --- .../ascend_store/ascend_store_connector.py | 2 +- .../kv_pool/ascend_store/config_data.py | 3 ++- .../kv_pool/ascend_store/kv_transfer.py | 18 ++++++++++++------ .../kv_pool/ascend_store/pool_scheduler.py | 2 +- .../kv_pool/ascend_store/pool_worker.py | 9 ++++++--- 5 files changed, 22 insertions(+), 12 deletions(-) diff --git a/vllm_ascend/distributed/kv_transfer/kv_pool/ascend_store/ascend_store_connector.py b/vllm_ascend/distributed/kv_transfer/kv_pool/ascend_store/ascend_store_connector.py index 2b221ee7..7580f59c 100644 --- a/vllm_ascend/distributed/kv_transfer/kv_pool/ascend_store/ascend_store_connector.py +++ b/vllm_ascend/distributed/kv_transfer/kv_pool/ascend_store/ascend_store_connector.py @@ -145,7 +145,7 @@ class AscendStoreConnector(KVConnectorBase_V1): """Get the finished recving and sending requests.""" assert self.connector_worker is not None done_sending, done_recving = self.connector_worker.get_finished( - finished_req_ids) + finished_req_ids, self._get_connector_metadata()) return done_sending, done_recving diff --git a/vllm_ascend/distributed/kv_transfer/kv_pool/ascend_store/config_data.py b/vllm_ascend/distributed/kv_transfer/kv_pool/ascend_store/config_data.py index 4160f7ff..676018ed 100644 --- a/vllm_ascend/distributed/kv_transfer/kv_pool/ascend_store/config_data.py +++ b/vllm_ascend/distributed/kv_transfer/kv_pool/ascend_store/config_data.py @@ -379,9 +379,10 @@ class ReqMeta: class AscendConnectorMetadata(KVConnectorMetadata): - def __init__(self, unfinished_request_ids): + def __init__(self, unfinished_request_ids, preempted_req_ids): self.requests = [] self.unfinished_request_ids = unfinished_request_ids + self.preempted_req_ids = preempted_req_ids def add_request(self, req_meta: ReqMeta) -> None: """Add a request to the metadata. diff --git a/vllm_ascend/distributed/kv_transfer/kv_pool/ascend_store/kv_transfer.py b/vllm_ascend/distributed/kv_transfer/kv_pool/ascend_store/kv_transfer.py index 0eeedbf4..84f289bf 100644 --- a/vllm_ascend/distributed/kv_transfer/kv_pool/ascend_store/kv_transfer.py +++ b/vllm_ascend/distributed/kv_transfer/kv_pool/ascend_store/kv_transfer.py @@ -117,6 +117,11 @@ class KVCacheStoreSendingThread(KVTransferThread): with self.done_task_lock: self.stored_requests[req_id] += 1 + def dec_stored_request(self, req_id: str): + with self.done_task_lock: + if req_id in self.stored_requests: + self.stored_requests[req_id] -= 1 + def delete_finished_stored_request(self, req_id: str): with self.done_task_lock: if req_id in self.stored_requests: @@ -130,6 +135,10 @@ class KVCacheStoreSendingThread(KVTransferThread): starts = [] ends = [] keys = [] + if req_id not in self.stored_requests: + self.request_queue.task_done() + return + for start, end, key in self.token_database.process_tokens( token_len, req_meta.block_hashes): starts.append(start) @@ -142,15 +151,13 @@ class KVCacheStoreSendingThread(KVTransferThread): keys = keys[self.tp_rank % self.put_step::self.put_step] if not keys: - with self.done_task_lock: - self.stored_requests[req_id] -= 1 + self.dec_stored_request(req_id) return skip_block_num = self.lookup(keys) if skip_block_num == len(keys): - with self.done_task_lock: - self.stored_requests[req_id] -= 1 + self.dec_stored_request(req_id) return starts = starts[skip_block_num:] @@ -189,8 +196,7 @@ class KVCacheStoreSendingThread(KVTransferThread): current_event.synchronize() self.m_store.put(keys, addrs, sizes) - with self.done_task_lock: - self.stored_requests[req_id] -= 1 + self.dec_stored_request(req_id) self.request_queue.task_done() diff --git a/vllm_ascend/distributed/kv_transfer/kv_pool/ascend_store/pool_scheduler.py b/vllm_ascend/distributed/kv_transfer/kv_pool/ascend_store/pool_scheduler.py index 3ca2ca49..a50f950c 100644 --- a/vllm_ascend/distributed/kv_transfer/kv_pool/ascend_store/pool_scheduler.py +++ b/vllm_ascend/distributed/kv_transfer/kv_pool/ascend_store/pool_scheduler.py @@ -162,7 +162,7 @@ class KVPoolScheduler: self._unfinished_requests.pop(finished_req_id, None) self._unfinished_request_ids.discard(finished_req_id) - meta = AscendConnectorMetadata(self._unfinished_request_ids) + meta = AscendConnectorMetadata(self._unfinished_request_ids, scheduler_output.preempted_req_ids) for request in scheduler_output.scheduled_new_reqs: # Right now, we only load KV for new requests diff --git a/vllm_ascend/distributed/kv_transfer/kv_pool/ascend_store/pool_worker.py b/vllm_ascend/distributed/kv_transfer/kv_pool/ascend_store/pool_worker.py index 19f1c5b8..25080a2b 100644 --- a/vllm_ascend/distributed/kv_transfer/kv_pool/ascend_store/pool_worker.py +++ b/vllm_ascend/distributed/kv_transfer/kv_pool/ascend_store/pool_worker.py @@ -463,10 +463,10 @@ class KVPoolWorker: yield def get_finished(self, - finished_req_ids: set[str]) -> tuple[set[str], set[str]]: + finished_req_ids: set[str], meta:AscendConnectorMetadata) -> tuple[set[str], set[str]]: done_sending = ( self.get_and_clear_finished_requests( - finished_req_ids # type: ignore[union-attr] + finished_req_ids, meta # type: ignore[union-attr] ) if self.kv_role in ['kv_producer', 'kv_both'] or self.consumer_is_to_put else set()) @@ -481,8 +481,11 @@ class KVPoolWorker: self.tp_rank) return done_sending, done_recving - def get_and_clear_finished_requests(self, finished_req_ids) -> set[str]: + def get_and_clear_finished_requests(self, finished_req_ids, meta:AscendConnectorMetadata) -> set[str]: finished_sending = set() + for req_id in meta.preempted_req_ids: + self.kv_send_thread.delete_finished_stored_request( # type: ignore[union-attr] + req_id) for req_id in self.kv_send_thread.stored_requests.copy( # type: ignore[union-attr] ): if self.kv_send_thread.stored_requests[ # type: ignore[union-attr]