[v0.11.0][bugfix]kvpool sync load (#3698) (#3722)

### What this PR does / why we need it?
In certain scenarios, the performance of synchronously loading data from
the pool is better than that of asynchronously loading data. Therefore,
a control logic (or switch) for asynchronous loading from the pool has
been added.
### Does this PR introduce _any_ user-facing change?

### How was this patch tested?

- vLLM version: v0.11.0rc3
- vLLM main: https://github.com/vllm-project/vllm/commit/v0.11.0

---------

<!--  Thanks for sending a pull request!

BEFORE SUBMITTING, PLEASE READ
https://docs.vllm.ai/en/latest/contributing/overview.html

Signed-off-by: fems14 <1804143737@qq.com>
This commit is contained in:
fems14
2025-10-24 18:21:46 +08:00
committed by GitHub
parent 33514a4cc2
commit f0eb3e1d97
4 changed files with 89 additions and 21 deletions

View File

@@ -10,7 +10,13 @@
* vLLM-Ascendmain branch
* Mooncake[AscendTransport/Mooncake at pooling-async-memcpy](https://github.com/AscendTransport/Mooncake/tree/pooling-async-memcpy)(Currently available branch code, continuously updated.)
Installation and Compilation Guidehttps://github.com/AscendTransport/Mooncake/tree/pooling-async-memcpy?tab=readme-ov-file#build-and-use-binaries
### KV Pooling Parameter Description
**kv_connector_extra_config**:Additional Configurable Parameters for Pooling
**mooncake_rpc_port**:Port for RPC Communication Between Pooling Scheduler Process and Worker Process: Each Instance Requires a Unique Port Configuration.
**load_async**:Whether to Enable Asynchronous Loading. The default value is false.
**register_buffer**:Whether to Register Video Memory with the Backend. Registration is Not Required When Used with MooncakeConnectorV1; It is Required in All Other Cases. The Default Value is false.
## run mooncake master
### 1.Configure mooncake.json

View File

@@ -37,6 +37,10 @@ class MooncakeEngine:
self.tp_rank = parallel_config.rank
self.tp_size = parallel_config.tensor_parallel_size
self.kv_role = vllm_config.kv_transfer_config.kv_role
self.load_async = vllm_config.kv_transfer_config.kv_connector_extra_config.get(
"load_async", False)
self.register_buffer = vllm_config.kv_transfer_config.kv_connector_extra_config.get(
"register_buffer", False)
self.block_size = vllm_config.cache_config.block_size
self.current_layer = 0
# self.use_mla = first_kv_cache_tuple[0].size(
@@ -102,7 +106,6 @@ class MooncakeEngine:
self.use_mla, first_kv_cache.shape)
self.kv_caches = kv_caches
self.m_store.set_kv_caches(kv_caches.values())
self.kv_caches_base_addr = []
for cache_or_caches in kv_caches.values():
# Normalize to always be a list of caches
@@ -110,12 +113,28 @@ class MooncakeEngine:
for i, cache in enumerate(cache_or_caches, 0):
base_addr = cache.data_ptr()
self.kv_caches_base_addr.append(base_addr)
if self.register_buffer:
region_len = self.num_blocks * self.block_len[i % 2]
self._register(base_addr, region_len)
else:
cache_list = [cache_or_caches
] if self.use_mla else cache_or_caches
for cache in cache_list:
base_addr = cache.data_ptr()
self.kv_caches_base_addr.append(base_addr)
if self.register_buffer:
region_len = self.num_blocks * self.block_len[0]
self._register(base_addr, region_len)
def _register(self, ptr, length):
logger.debug(
"Registering KV cache: ptr=0x%x, length=%d, num_blocks=%d, "
"block_lens=%s", ptr, length, self.num_blocks, self.block_len)
try:
self.m_store.register_buffer(ptr, length)
except Exception as e:
raise RuntimeError(
f"Mooncake memory registration failed. Error is: {e}")
if self.use_layerwise:
self.get_event = threading.Event()
@@ -142,13 +161,14 @@ class MooncakeEngine:
self.kv_caches_base_addr, self.token_database,
self.block_len, self.block_size, ready_event_sending)
self.kv_send_thread.start()
ready_event = threading.Event()
self.kv_recv_thread = KVCacheStoreRecvingThread(
self.tp_rank, self.tp_size, self.m_store,
self.kv_caches_base_addr, self.token_database, self.block_len,
self.block_size, ready_event)
self.kv_recv_thread.start()
ready_event.wait()
if self.load_async:
ready_event = threading.Event()
self.kv_recv_thread = KVCacheStoreRecvingThread(
self.tp_rank, self.tp_size, self.m_store,
self.kv_caches_base_addr, self.token_database,
self.block_len, self.block_size, ready_event)
self.kv_recv_thread.start()
ready_event.wait()
def start_load_kv(self, metadata: MooncakeConnectorMetadata):
self.current_layer = 0
@@ -179,12 +199,49 @@ class MooncakeEngine:
next(layerwise_retriever) # first layer load
self.layerwise_retrievers.append(layerwise_retriever)
else:
self.kv_recv_thread.add_request( # type: ignore[union-attr]
req_id,
tokens,
request.block_ids,
token_mask,
)
if self.load_async:
self.kv_recv_thread.add_request( # type: ignore[union-attr]
req_id,
tokens,
request.block_ids,
token_mask,
)
else:
if self.m_store.config.use_ascend_direct:
addr_list = []
size_list = []
key_list = []
blockIds = []
for start, end, key in self.token_database.process_tokens(
tokens, token_mask):
addr, size, block_id = self.prepare_value(
start, end, request.block_ids)
key_list.append(key.to_string())
addr_list.append(addr)
size_list.append(size)
blockIds.append(block_id)
self.m_store.get_batch(key_list, addr_list, size_list,
blockIds)
else:
for start, end, key in self.token_database.process_tokens(
tokens, token_mask):
addr, size, _ = self.prepare_value(
start, end, request.block_ids)
self.m_store.get(key, addr, size)
def prepare_value(self, start: int, end: int, block_ids: list[int]):
addr_list = []
size_list = []
block_id = block_ids[start // self.block_size]
for index, base_addr in enumerate(self.kv_caches_base_addr):
block_len = (self.block_len[index % 2]
if self.use_mla else self.block_len[0])
addr = base_addr + block_id * block_len
length = int(block_len / self.block_size * (end - start))
addr_list.append(addr)
size_list.append(length)
return addr_list, size_list, block_id
def wait_for_layer_load(self) -> None:
"""MooncakeConnector does not do layerwise saving."""
@@ -430,8 +487,11 @@ class MooncakeEngine:
self.kv_send_thread.
get_and_clear_finished_requests( # type: ignore[union-attr]
) if self.kv_role in ['kv_producer', 'kv_both'] else set())
done_recving = self.kv_recv_thread.get_and_clear_finished_requests( # type: ignore[union-attr]
)
done_recving = (
self.kv_recv_thread.
get_and_clear_finished_requests( # type: ignore[union-attr]
) if self.load_async else set())
logger.debug(
"Number of completed KV cache send requests: %d, receive "

View File

@@ -65,15 +65,15 @@ class Mooncakestore():
logger.error(msg)
raise RuntimeError(msg)
def set_kv_caches(self, kvcache):
self.kvcache = list(kvcache)
def exists(self, key: MooncakeEngineKey) -> bool:
return self.store.is_exist(key.to_string()) == 1
def batch_exists(self, keys: list[str]) -> list[bool]:
return self.store.batch_is_exist(keys)
def register_buffer(self, ptr, length):
return self.store.register_buffer(ptr, length)
def get_batch(self, keys: list[str], addrs: list[list[int]],
sizes: list[list[int]], block_ids: list[int]):
try:

View File

@@ -165,6 +165,8 @@ class MooncakeStoreConnectorV1Scheduler:
self.kv_role = vllm_config.kv_transfer_config.kv_role
self.consumer_is_to_load = vllm_config.kv_transfer_config.kv_connector_extra_config.get(
"consumer_is_to_load", False)
self.load_async = vllm_config.kv_transfer_config.kv_connector_extra_config.get(
"load_async", False)
# request_id -> (vllm cached tokes, mooncake cached tokens)
self.load_specs: dict[str, LoadSpec] = {}
self._block_size = vllm_config.cache_config.block_size
@@ -229,7 +231,7 @@ class MooncakeStoreConnectorV1Scheduler:
can_load=False,
)
return need_to_allocate, not self.use_layerwise
return need_to_allocate, self.load_async
def update_state_after_alloc(self, request: "Request",
blocks: "KVCacheBlocks",