[P/D][bugfix]Fix the PCP port mapping error issue (#5706)

### What this PR does / why we need it?
Fix the PCP port mapping error issue.In a multi-node PD separation
scenario, when the PCP feature is enabled, there is an issue with the
ZMQ transmission port. Specifically, the IP and port received by Side D
do not match. The cause of this issue is an error in the port mapping
update strategy logic.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
By ci
- vLLM version: v0.13.0
- vLLM main:
2f4e6548ef

---------

Signed-off-by: wangxiaoteng <wangxiaoteng@huawei.com>
This commit is contained in:
wangxiaoteng888
2026-01-10 22:43:52 +08:00
committed by GitHub
parent ff4c1a47b3
commit aa987ffe87
2 changed files with 25 additions and 18 deletions

View File

@@ -1264,7 +1264,7 @@ class TestMooncakeConnectorWorker(unittest.TestCase):
tp_rank, pcp_rank, _prefill_tp_size,
remote_pcp_size, remote_dcp_size,
remote_port, remote_block_ids,
local_block_ids):
local_block_ids, remote_engine_id):
worker = MooncakeConnectorWorker(self.vllm_config, self.engine_id)
@@ -1275,7 +1275,7 @@ class TestMooncakeConnectorWorker(unittest.TestCase):
worker.tp_rank = tp_rank
worker.pcp_rank = pcp_rank
worker._prefill_tp_size = _prefill_tp_size
worker.local_remote_block_port_mapping = None
worker.local_remote_block_port_mapping = {}
worker.block_size = 16
worker.num_key_value_heads = 1
@@ -1289,6 +1289,7 @@ class TestMooncakeConnectorWorker(unittest.TestCase):
meta.num_external_tokens = pcp_size * dcp_size * len(
local_block_ids) * worker.block_size
meta.num_prompt_blocks = pcp_size * dcp_size * len(local_block_ids)
meta.remote_engine_id = remote_engine_id
remote_handshake_port_list, local_block_ids_list, remote_block_ids_list = worker._get_kv_split_metadata(
'0', meta)
@@ -1297,14 +1298,14 @@ class TestMooncakeConnectorWorker(unittest.TestCase):
self.assertEqual(
get_kv_split_metadata(True, 1, 1, 8, 1, 0, 8, 1, 8, 30000, [1],
[1]),
[1], 0),
([[30001], [30002], [30003], [30004], [30005], [30006], [30007],
[30000]], [[], [], [], [], [], [], [], [1]], [[], [], [], [], [],
[], [], [1]]))
self.assertEqual(
get_kv_split_metadata(False, 1, 1, 8, 1, 0, 8, 2, 8, 30000, [1],
[1]),
[1], 0),
([[30001], [30002], [30003], [30004], [30005], [30006], [30007],
[30008], [30009], [30010], [30011], [30012], [30013], [30014],
[30015], [30000]
@@ -1314,29 +1315,29 @@ class TestMooncakeConnectorWorker(unittest.TestCase):
self.assertEqual(
get_kv_split_metadata(True, 1, 1, 8, 1, 0, 8, 2, 2, 30000, [1],
[1]),
[1], 0),
([[30001], [30008], [30009], [30000]], [[], [], [], [1]
], [[], [], [], [1]]))
self.assertEqual(
get_kv_split_metadata(False, 1, 1, 8, 1, 0, 8, 2, 2, 30000, [1],
[1]),
[1], 0),
([[30001], [30008], [30009], [30000]], [[], [], [], [1]
], [[], [], [], [1]]))
self.assertEqual(
get_kv_split_metadata(True, 1, 2, 8, 1, 0, 8, 2, 2, 30000, [1],
[1]),
[1], 0),
([[30000], [30008]], [[1], []], [[1], []]))
self.assertEqual(
get_kv_split_metadata(False, 1, 2, 8, 1, 0, 8, 2, 2, 30000, [1],
[1]),
[1], 0),
([[30000], [30008]], [[1], []], [[1], []]))
self.assertEqual(
get_kv_split_metadata(True, 1, 2, 8, 0, 0, 8, 2, 2, 30000,
[1, 2, 3], [1, 2, 3, 4, 5]),
[1, 2, 3], [1, 2, 3, 4, 5], 0),
([[30000], [30008]], [[1, 2, 3], [4, 5]], [[1, 2, 3], [1, 2]]))

View File

@@ -1154,8 +1154,9 @@ class MooncakeConnectorWorker:
num_p_block_heads = max(
1, self.num_key_value_heads // self._prefill_tp_size)
self.tp_num_need_pulls = num_d_block_heads // num_p_block_heads
self.local_remote_block_port_mapping = None
self.remote_port_send_num: dict[int, int] = {}
self.local_remote_block_port_mapping: dict[
str, Optional[List[List[int]]]] = {}
self.remote_port_send_num: dict[str, dict[int, int]] = {}
def _get_prefill_decode_size(self, vllm_config: VllmConfig):
# get prefill tp and dp size from extra config
@@ -1453,16 +1454,20 @@ class MooncakeConnectorWorker:
remote_port_send_num[remote_port] += 1
return remote_port_send_num
if self.local_remote_block_port_mapping is None:
if meta.remote_engine_id not in self.local_remote_block_port_mapping:
self.local_remote_block_port_mapping[meta.remote_engine_id] = None
if self.local_remote_block_port_mapping[meta.remote_engine_id] is None:
local_remote_block_port_mappings = get_local_remote_block_port_mappings(
)
self.local_remote_block_port_mapping = local_remote_block_port_mappings[
self.handshake_port]
self.remote_port_send_num = get_remote_port_send_num(
local_remote_block_port_mappings)
self.local_remote_block_port_mapping[
meta.remote_engine_id] = local_remote_block_port_mappings[
self.handshake_port]
self.remote_port_send_num[
meta.remote_engine_id] = get_remote_port_send_num(
local_remote_block_port_mappings)
local_remote_block_port_mapping = copy.deepcopy(
self.local_remote_block_port_mapping)
self.local_remote_block_port_mapping[meta.remote_engine_id])
num_external_blocks = math.ceil(meta.num_external_tokens /
self.block_size)
@@ -1568,7 +1573,8 @@ class MooncakeConnectorWorker:
pcp_dcp_rank][i],
offset=i,
tp_num_need_pulls=self.tp_num_need_pulls,
remote_port_send_num=self.remote_port_send_num,
remote_port_send_num=self.remote_port_send_num[
meta.remote_engine_id],
all_task_done=(
pcp_dcp_rank
== len(remote_handshake_port_list) - 1