[v0.18.0][BugFix][KV Pool]Fix the conflict between pooling scenarios … (#8101)

…and PCP across machines

<!--  Thanks for sending a pull request!

BEFORE SUBMITTING, PLEASE READ
https://docs.vllm.ai/en/latest/contributing/overview.html

-->
### What this PR does / why we need it?
<!--
- Please clarify what changes you are proposing. The purpose of this
section is to outline the changes and how this PR fixes the issue.
If possible, please consider writing useful notes for better and faster
reviews in your PR.

- Please clarify why the changes are needed. For instance, the use case
and bug description.

- Fixes #
-->

### Does this PR introduce _any_ user-facing change?
<!--
Note that it means *any* user-facing change including all aspects such
as API, interface or other behavior changes.
Documentation-only updates are not considered user-facing changes.
-->

### How was this patch tested?
<!--
CI passed with new added/existing test.
If it was tested in a way different from regular unit tests, please
clarify how you tested step by step, ideally copy and paste-able, so
that other reviewers can test and check, and descendants can verify in
the future.
If tests were not added, please describe why they were not added and/or
why it was difficult to add.
-->

Signed-off-by: DreamLeader <2270923832@qq.com>
This commit is contained in:
DreamerLeader
2026-04-09 21:55:56 +08:00
committed by GitHub
parent 054fde7b72
commit 531d0e6fff
2 changed files with 9 additions and 15 deletions

View File

@@ -3,6 +3,7 @@ from enum import Enum
import torch import torch
from vllm.config import ParallelConfig from vllm.config import ParallelConfig
from vllm.distributed.parallel_state import get_world_group
from vllm.logger import logger from vllm.logger import logger
from vllm_ascend.distributed.kv_transfer.kv_pool.ascend_store.backend.backend import Backend from vllm_ascend.distributed.kv_transfer.kv_pool.ascend_store.backend.backend import Backend
@@ -29,21 +30,13 @@ class MemcacheBackend(Backend):
try: try:
soc_version = get_ascend_device_type() soc_version = get_ascend_device_type()
if soc_version in {AscendDeviceType.A2}: if soc_version in {AscendDeviceType.A2}:
import torch
from vllm.distributed import get_world_group
tmp_tensor = torch.zeros(1, device="npu") tmp_tensor = torch.zeros(1, device="npu")
output_tensor_list = [torch.empty_like(tmp_tensor) for _ in range(torch.distributed.get_world_size())] output_tensor_list = [torch.empty_like(tmp_tensor) for _ in range(torch.distributed.get_world_size())]
torch.distributed.all_gather(output_tensor_list, tmp_tensor, group=get_world_group().device_group) torch.distributed.all_gather(output_tensor_list, tmp_tensor, group=get_world_group().device_group)
self.rank = parallel_config.rank self.local_rank = get_world_group().local_rank
self.store = DistributedObjectStore() self.store = DistributedObjectStore()
res = self.store.init(self.rank) res = self.store.init(self.local_rank)
assert res == 0 assert res == 0
else:
self.rank = parallel_config.rank
self.store = DistributedObjectStore()
res = self.store.init(self.rank)
assert res == 0
except ValueError as e: except ValueError as e:
logger.error("Configuration loading failed: %s", e) logger.error("Configuration loading failed: %s", e)
raise raise
@@ -52,7 +45,7 @@ class MemcacheBackend(Backend):
raise raise
def set_device(self): def set_device(self):
device = torch.device(f"npu:{self.rank}") device = torch.device(f"npu:{self.local_rank}")
torch.npu.set_device(device) torch.npu.set_device(device)
def register_buffer(self, ptrs: list[int], sizes: list[int]): def register_buffer(self, ptrs: list[int], sizes: list[int]):

View File

@@ -8,6 +8,7 @@ import torch
# Third Party # Third Party
from vllm.config import ParallelConfig from vllm.config import ParallelConfig
from vllm.distributed.parallel_state import get_world_group
from vllm.logger import logger from vllm.logger import logger
from vllm.utils.network_utils import get_ip from vllm.utils.network_utils import get_ip
@@ -30,7 +31,6 @@ class MooncakeBackend(Backend):
) from e ) from e
self.config = MooncakeStoreConfig.load_from_env() self.config = MooncakeStoreConfig.load_from_env()
self.store = MooncakeDistributedStore() self.store = MooncakeDistributedStore()
self.rank = parallel_config.rank
if self.config.protocol == "ascend": if self.config.protocol == "ascend":
local_hostname = get_ip() local_hostname = get_ip()
# ASCEND_ENABLE_USE_FABRIC_MEM: Enable unified memory address direct transmission scheme # ASCEND_ENABLE_USE_FABRIC_MEM: Enable unified memory address direct transmission scheme
@@ -67,7 +67,8 @@ class MooncakeBackend(Backend):
raise RuntimeError(msg) raise RuntimeError(msg)
def set_device(self): def set_device(self):
device = torch.device(f"npu:{self.rank}") local_rank = get_world_group().local_rank
device = torch.device(f"npu:{local_rank}")
torch.npu.set_device(device) torch.npu.set_device(device)
def register_buffer(self, ptrs: list[int], lengths: list[int]): def register_buffer(self, ptrs: list[int], lengths: list[int]):