[Cherry Pick from pr#3981][0.11.0][P/D]Make kv-transfer env variable take effect & Fix load-balance proxy (#3983)

### What this PR does / why we need it?
Make kv-transfer env variable take effect & Fix load-balance proxy.
Cherry Pick from #3981

---------
Signed-off-by: nwpu-zxr <zhouxuerong2@huawei.com>
This commit is contained in:
zxr2333
2025-11-08 13:52:33 +08:00
committed by GitHub
parent 8e72758645
commit d4e2a44307
7 changed files with 35 additions and 9 deletions

View File

@@ -561,9 +561,12 @@ async def metaserver(request: Request):
max_retries=global_args.max_retries,
base_delay=global_args.retry_delay)
proxy_state.release_prefiller(prefiller_idx, prefiller_score)
proxy_state.release_prefiller_kv(prefiller_idx,prefiller_score)
except Exception as e:
logger.error(f"Post metaserver failed with: {str(e)}")
proxy_state.release_prefiller(prefiller_idx, prefiller_score)
proxy_state.release_prefiller_kv(prefiller_idx, prefiller_score)
if __name__ == '__main__':

View File

@@ -1018,7 +1018,9 @@ class TestMooncakeConnectorWorker(unittest.TestCase):
self.mock_transfer_engine.register_memory.return_value = 0
self.patches = [
patch('os.getenv', return_value="10,11"),
patch(
'vllm_ascend.distributed.mooncake_layerwise_connector.envs_ascend.PHYSICAL_DEVICES',
'10,11'),
patch('torch.Tensor.size', return_value=(10, 16, 8, 16)),
patch('torch.Tensor.element_size', return_value=4),
patch('torch.Tensor.data_ptr', return_value=0x1000),
@@ -1047,8 +1049,6 @@ class TestMooncakeConnectorWorker(unittest.TestCase):
MagicMock()),
patch('vllm_ascend.distributed.mooncake_connector.threading.Event',
MagicMock()),
patch.dict('sys.modules',
{'vllm_ascend.envs': self.envs_ascend_mock}),
]
for p in self.patches:
@@ -1136,4 +1136,4 @@ class TestMooncakeConnectorWorker(unittest.TestCase):
if __name__ == '__main__':
unittest.main()
unittest.main()

View File

@@ -800,7 +800,9 @@ class TestMooncakeLayerwiseConnectorWorker(unittest.TestCase):
self.mock_transfer_engine.register_memory.return_value = 0
self.patches = [
patch('os.getenv', return_value="10,11"),
patch(
'vllm_ascend.distributed.mooncake_layerwise_connector.envs_ascend.PHYSICAL_DEVICES',
'10,11'),
patch('torch.Tensor.size', return_value=(10, 16, 8, 16)),
patch('torch.Tensor.element_size', return_value=4),
patch('torch.Tensor.data_ptr', return_value=0x1000),
@@ -833,8 +835,6 @@ class TestMooncakeLayerwiseConnectorWorker(unittest.TestCase):
patch(
'vllm_ascend.distributed.mooncake_layerwise_connector.threading.Event',
MagicMock()),
patch.dict('sys.modules',
{'vllm_ascend.envs': self.envs_ascend_mock}),
patch(
'vllm_ascend.distributed.mooncake_layerwise_connector.get_ascend_config',
return_value=SimpleNamespace(pd_tp_ratio=1,
@@ -913,4 +913,4 @@ class TestMooncakeLayerwiseConnectorWorker(unittest.TestCase):
if __name__ == '__main__':
unittest.main()
unittest.main()

View File

@@ -30,6 +30,7 @@ from vllm.v1.core.sched.output import SchedulerOutput
from vllm.v1.request import Request, RequestStatus
import vllm_ascend.envs as envs_ascend
from vllm_ascend.distributed.utils import get_transfer_timeout_value
from vllm_ascend.utils import AscendSocVersion, get_ascend_soc_version
TORCH_DTYPE_TO_NPU_DTYPE = {
@@ -411,7 +412,7 @@ class LLMDataDistCMgrConnectorWorker():
assert self.local_agent_metadata is not None
llm_config = LLMConfig()
llm_config.device_id = self.local_rank
llm_config.sync_kv_timeout = 20000
llm_config.sync_kv_timeout = get_transfer_timeout_value()
llm_config.enable_switch_role = True
llm_config.enable_cache_manager = True
llm_config.enable_remote_cache_accessible = True

View File

@@ -2,6 +2,7 @@
import contextlib
import hashlib
import math
import os
import queue
import random
import struct
@@ -33,6 +34,7 @@ from vllm.v1.request import RequestStatus
import vllm_ascend.envs as envs_ascend
from vllm_ascend.ascend_config import get_ascend_config, init_ascend_config
from vllm_ascend.distributed.mooncake.transfer_engine import get_global_te
from vllm_ascend.distributed.utils import get_transfer_timeout_value
if TYPE_CHECKING:
from vllm.attention.backends.abstract import AttentionMetadata
@@ -849,6 +851,8 @@ class MooncakeConnectorWorker:
def __init__(self, vllm_config: VllmConfig, engine_id: str):
self._get_prefill_decode_size(vllm_config)
os.environ["ASCEND_TRANSFER_TIMEOUT"] = str(
get_transfer_timeout_value())
if self._prefill_tp_size < self._decode_tp_size:
raise ValueError(
f"prefill_tp_size: {self._prefill_tp_size} must be greater than"

View File

@@ -3,6 +3,7 @@ import contextlib
import copy
import hashlib
import math
import os
import queue
import struct
import threading
@@ -31,6 +32,7 @@ from vllm.v1.core.sched.output import SchedulerOutput
import vllm_ascend.envs as envs_ascend
from vllm_ascend.ascend_config import get_ascend_config
from vllm_ascend.distributed.utils import (align_memory,
get_transfer_timeout_value,
kv_alltoall_and_rearrange)
if TYPE_CHECKING:
@@ -596,6 +598,8 @@ class MooncakeLayerwiseConnectorWorker:
def __init__(self, vllm_config: VllmConfig, engine_id: str):
self._get_prefill_decode_size(vllm_config)
os.environ["ASCEND_TRANSFER_TIMEOUT"] = str(
get_transfer_timeout_value())
if self._prefill_tp_size < self._decode_tp_size:
raise ValueError(
f"prefill_tp_size: {self._prefill_tp_size} must be greater than"

View File

@@ -1,3 +1,5 @@
import os
import torch
import torch.distributed as dist
@@ -45,3 +47,15 @@ def align_memory(tensor: torch.Tensor, alignment: int) -> torch.Tensor:
aligned_addr = (data_ptr + alignment - 1) // alignment * alignment
offset = (aligned_addr - data_ptr) // tensor.element_size()
return tensor[int(offset):]
def get_transfer_timeout_value():
ascend_transfer_timeout = os.getenv("ASCEND_TRANSFER_TIMEOUT", "")
if len(ascend_transfer_timeout) > 0:
return int(ascend_transfer_timeout)
hccl_rdma_timeout = int(os.getenv('HCCL_RDMA_TIMEOUT',
'20')) # type: ignore
hccl_rdma_retry_cnt = int(os.getenv('HCCL_RDMA_RETRY_CNT',
'7')) # type: ignore
return int((4.096 * (2**hccl_rdma_timeout)) * hccl_rdma_retry_cnt // 1000 +
3000)