[P/D][BugFix][v0.11.0-dev]Fix proxy format processing errors & Layerwise connector performance optimization (#4069)

### What this PR does / why we need it?
1.Fix proxy format processing errors.
2.Layer-wise connector performance optimization

Signed-off-by: wangxiaoteng <wangxiaoteng@huawei.com>
This commit is contained in:
wangxiaoteng888
2025-11-09 09:55:10 +08:00
committed by GitHub
parent 55e37f5041
commit c2d58c0655
3 changed files with 16 additions and 3 deletions

View File

@@ -19,6 +19,7 @@ import msgspec
import numpy as np
import numpy.typing as npt
import torch
import torch_npu
import zmq
from mooncake.engine import TransferEngine # type: ignore
from vllm.config import VllmConfig
@@ -87,6 +88,8 @@ class KVCacheSendingLayerThread(threading.Thread):
self.total_layers = total_layers
self.use_mla = use_mla
self.block_len = block_len
self.model_stream = torch_npu.npu.current_stream()
self.current_layer = -1
if self.pd_head_ratio > 1:
# regesit kv buffer for tp inequal
@@ -186,7 +189,9 @@ class KVCacheSendingLayerThread(threading.Thread):
src_list.append(src)
dst_list.append(dst)
length_list.append(length)
torch.npu.synchronize()
if self.current_layer != layer_index:
self.current_layer = layer_index
self.model_stream.synchronize()
ret = self.engine.batch_transfer_sync_write(
session_id, src_list, dst_list, length_list)
if ret < 0:
@@ -237,7 +242,7 @@ class KVCacheSendingLayerThread(threading.Thread):
((self.tp_rank // self.num_head_replica) %
self.pd_head_ratio))
src_layer_addr += length
torch.npu.synchronize()
self.model_stream.synchronize()
ret = self.engine.batch_transfer_sync_write(
session_id, src_list, dst_list, length_list)
if ret < 0: