【main】【bugfix】Resolved memory deallocation failure in the pooling layer under re-computation workloads. (#6045)
### What this PR does / why we need it?
Resolved a double-free memory vulnerability in the pooling layer under
re-computation scenarios.
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
- vLLM version: v0.13.0
- vLLM main:
d68209402d
Signed-off-by: fems14 <1804143737@qq.com>
This commit is contained in:
@@ -145,7 +145,7 @@ class AscendStoreConnector(KVConnectorBase_V1):
|
|||||||
"""Get the finished recving and sending requests."""
|
"""Get the finished recving and sending requests."""
|
||||||
assert self.connector_worker is not None
|
assert self.connector_worker is not None
|
||||||
done_sending, done_recving = self.connector_worker.get_finished(
|
done_sending, done_recving = self.connector_worker.get_finished(
|
||||||
finished_req_ids)
|
finished_req_ids, self._get_connector_metadata())
|
||||||
return done_sending, done_recving
|
return done_sending, done_recving
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -379,9 +379,10 @@ class ReqMeta:
|
|||||||
|
|
||||||
class AscendConnectorMetadata(KVConnectorMetadata):
|
class AscendConnectorMetadata(KVConnectorMetadata):
|
||||||
|
|
||||||
def __init__(self, unfinished_request_ids):
|
def __init__(self, unfinished_request_ids, preempted_req_ids):
|
||||||
self.requests = []
|
self.requests = []
|
||||||
self.unfinished_request_ids = unfinished_request_ids
|
self.unfinished_request_ids = unfinished_request_ids
|
||||||
|
self.preempted_req_ids = preempted_req_ids
|
||||||
|
|
||||||
def add_request(self, req_meta: ReqMeta) -> None:
|
def add_request(self, req_meta: ReqMeta) -> None:
|
||||||
"""Add a request to the metadata.
|
"""Add a request to the metadata.
|
||||||
|
|||||||
@@ -117,6 +117,11 @@ class KVCacheStoreSendingThread(KVTransferThread):
|
|||||||
with self.done_task_lock:
|
with self.done_task_lock:
|
||||||
self.stored_requests[req_id] += 1
|
self.stored_requests[req_id] += 1
|
||||||
|
|
||||||
|
def dec_stored_request(self, req_id: str):
|
||||||
|
with self.done_task_lock:
|
||||||
|
if req_id in self.stored_requests:
|
||||||
|
self.stored_requests[req_id] -= 1
|
||||||
|
|
||||||
def delete_finished_stored_request(self, req_id: str):
|
def delete_finished_stored_request(self, req_id: str):
|
||||||
with self.done_task_lock:
|
with self.done_task_lock:
|
||||||
if req_id in self.stored_requests:
|
if req_id in self.stored_requests:
|
||||||
@@ -130,6 +135,10 @@ class KVCacheStoreSendingThread(KVTransferThread):
|
|||||||
starts = []
|
starts = []
|
||||||
ends = []
|
ends = []
|
||||||
keys = []
|
keys = []
|
||||||
|
if req_id not in self.stored_requests:
|
||||||
|
self.request_queue.task_done()
|
||||||
|
return
|
||||||
|
|
||||||
for start, end, key in self.token_database.process_tokens(
|
for start, end, key in self.token_database.process_tokens(
|
||||||
token_len, req_meta.block_hashes):
|
token_len, req_meta.block_hashes):
|
||||||
starts.append(start)
|
starts.append(start)
|
||||||
@@ -142,15 +151,13 @@ class KVCacheStoreSendingThread(KVTransferThread):
|
|||||||
keys = keys[self.tp_rank % self.put_step::self.put_step]
|
keys = keys[self.tp_rank % self.put_step::self.put_step]
|
||||||
|
|
||||||
if not keys:
|
if not keys:
|
||||||
with self.done_task_lock:
|
self.dec_stored_request(req_id)
|
||||||
self.stored_requests[req_id] -= 1
|
|
||||||
return
|
return
|
||||||
|
|
||||||
skip_block_num = self.lookup(keys)
|
skip_block_num = self.lookup(keys)
|
||||||
|
|
||||||
if skip_block_num == len(keys):
|
if skip_block_num == len(keys):
|
||||||
with self.done_task_lock:
|
self.dec_stored_request(req_id)
|
||||||
self.stored_requests[req_id] -= 1
|
|
||||||
return
|
return
|
||||||
|
|
||||||
starts = starts[skip_block_num:]
|
starts = starts[skip_block_num:]
|
||||||
@@ -189,8 +196,7 @@ class KVCacheStoreSendingThread(KVTransferThread):
|
|||||||
current_event.synchronize()
|
current_event.synchronize()
|
||||||
self.m_store.put(keys, addrs, sizes)
|
self.m_store.put(keys, addrs, sizes)
|
||||||
|
|
||||||
with self.done_task_lock:
|
self.dec_stored_request(req_id)
|
||||||
self.stored_requests[req_id] -= 1
|
|
||||||
self.request_queue.task_done()
|
self.request_queue.task_done()
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -162,7 +162,7 @@ class KVPoolScheduler:
|
|||||||
self._unfinished_requests.pop(finished_req_id, None)
|
self._unfinished_requests.pop(finished_req_id, None)
|
||||||
self._unfinished_request_ids.discard(finished_req_id)
|
self._unfinished_request_ids.discard(finished_req_id)
|
||||||
|
|
||||||
meta = AscendConnectorMetadata(self._unfinished_request_ids)
|
meta = AscendConnectorMetadata(self._unfinished_request_ids, scheduler_output.preempted_req_ids)
|
||||||
|
|
||||||
for request in scheduler_output.scheduled_new_reqs:
|
for request in scheduler_output.scheduled_new_reqs:
|
||||||
# Right now, we only load KV for new requests
|
# Right now, we only load KV for new requests
|
||||||
|
|||||||
@@ -463,10 +463,10 @@ class KVPoolWorker:
|
|||||||
yield
|
yield
|
||||||
|
|
||||||
def get_finished(self,
|
def get_finished(self,
|
||||||
finished_req_ids: set[str]) -> tuple[set[str], set[str]]:
|
finished_req_ids: set[str], meta:AscendConnectorMetadata) -> tuple[set[str], set[str]]:
|
||||||
done_sending = (
|
done_sending = (
|
||||||
self.get_and_clear_finished_requests(
|
self.get_and_clear_finished_requests(
|
||||||
finished_req_ids # type: ignore[union-attr]
|
finished_req_ids, meta # type: ignore[union-attr]
|
||||||
) if self.kv_role in ['kv_producer', 'kv_both']
|
) if self.kv_role in ['kv_producer', 'kv_both']
|
||||||
or self.consumer_is_to_put else set())
|
or self.consumer_is_to_put else set())
|
||||||
|
|
||||||
@@ -481,8 +481,11 @@ class KVPoolWorker:
|
|||||||
self.tp_rank)
|
self.tp_rank)
|
||||||
return done_sending, done_recving
|
return done_sending, done_recving
|
||||||
|
|
||||||
def get_and_clear_finished_requests(self, finished_req_ids) -> set[str]:
|
def get_and_clear_finished_requests(self, finished_req_ids, meta:AscendConnectorMetadata) -> set[str]:
|
||||||
finished_sending = set()
|
finished_sending = set()
|
||||||
|
for req_id in meta.preempted_req_ids:
|
||||||
|
self.kv_send_thread.delete_finished_stored_request( # type: ignore[union-attr]
|
||||||
|
req_id)
|
||||||
for req_id in self.kv_send_thread.stored_requests.copy( # type: ignore[union-attr]
|
for req_id in self.kv_send_thread.stored_requests.copy( # type: ignore[union-attr]
|
||||||
):
|
):
|
||||||
if self.kv_send_thread.stored_requests[ # type: ignore[union-attr]
|
if self.kv_send_thread.stored_requests[ # type: ignore[union-attr]
|
||||||
|
|||||||
Reference in New Issue
Block a user