diff --git a/examples/disaggregated_prefill_v1/load_balance_proxy_layerwise_server_example.py b/examples/disaggregated_prefill_v1/load_balance_proxy_layerwise_server_example.py index 67c34ee..82e06e0 100644 --- a/examples/disaggregated_prefill_v1/load_balance_proxy_layerwise_server_example.py +++ b/examples/disaggregated_prefill_v1/load_balance_proxy_layerwise_server_example.py @@ -561,9 +561,12 @@ async def metaserver(request: Request): max_retries=global_args.max_retries, base_delay=global_args.retry_delay) proxy_state.release_prefiller(prefiller_idx, prefiller_score) + proxy_state.release_prefiller_kv(prefiller_idx,prefiller_score) except Exception as e: logger.error(f"Post metaserver failed with: {str(e)}") + proxy_state.release_prefiller(prefiller_idx, prefiller_score) + proxy_state.release_prefiller_kv(prefiller_idx, prefiller_score) if __name__ == '__main__': diff --git a/tests/ut/kv_connector/test_mooncake_connector.py b/tests/ut/kv_connector/test_mooncake_connector.py index 8226073..884f187 100644 --- a/tests/ut/kv_connector/test_mooncake_connector.py +++ b/tests/ut/kv_connector/test_mooncake_connector.py @@ -1018,7 +1018,9 @@ class TestMooncakeConnectorWorker(unittest.TestCase): self.mock_transfer_engine.register_memory.return_value = 0 self.patches = [ - patch('os.getenv', return_value="10,11"), + patch( + 'vllm_ascend.distributed.mooncake_layerwise_connector.envs_ascend.PHYSICAL_DEVICES', + '10,11'), patch('torch.Tensor.size', return_value=(10, 16, 8, 16)), patch('torch.Tensor.element_size', return_value=4), patch('torch.Tensor.data_ptr', return_value=0x1000), @@ -1047,8 +1049,6 @@ class TestMooncakeConnectorWorker(unittest.TestCase): MagicMock()), patch('vllm_ascend.distributed.mooncake_connector.threading.Event', MagicMock()), - patch.dict('sys.modules', - {'vllm_ascend.envs': self.envs_ascend_mock}), ] for p in self.patches: @@ -1136,4 +1136,4 @@ class TestMooncakeConnectorWorker(unittest.TestCase): if __name__ == '__main__': - unittest.main() \ No newline at end of file + unittest.main() diff --git a/tests/ut/kv_connector/test_mooncake_layerwise_connector.py b/tests/ut/kv_connector/test_mooncake_layerwise_connector.py index b454282..31611ac 100644 --- a/tests/ut/kv_connector/test_mooncake_layerwise_connector.py +++ b/tests/ut/kv_connector/test_mooncake_layerwise_connector.py @@ -800,7 +800,9 @@ class TestMooncakeLayerwiseConnectorWorker(unittest.TestCase): self.mock_transfer_engine.register_memory.return_value = 0 self.patches = [ - patch('os.getenv', return_value="10,11"), + patch( + 'vllm_ascend.distributed.mooncake_layerwise_connector.envs_ascend.PHYSICAL_DEVICES', + '10,11'), patch('torch.Tensor.size', return_value=(10, 16, 8, 16)), patch('torch.Tensor.element_size', return_value=4), patch('torch.Tensor.data_ptr', return_value=0x1000), @@ -833,8 +835,6 @@ class TestMooncakeLayerwiseConnectorWorker(unittest.TestCase): patch( 'vllm_ascend.distributed.mooncake_layerwise_connector.threading.Event', MagicMock()), - patch.dict('sys.modules', - {'vllm_ascend.envs': self.envs_ascend_mock}), patch( 'vllm_ascend.distributed.mooncake_layerwise_connector.get_ascend_config', return_value=SimpleNamespace(pd_tp_ratio=1, @@ -913,4 +913,4 @@ class TestMooncakeLayerwiseConnectorWorker(unittest.TestCase): if __name__ == '__main__': - unittest.main() \ No newline at end of file + unittest.main() diff --git a/vllm_ascend/distributed/llmdatadist_c_mgr_connector.py b/vllm_ascend/distributed/llmdatadist_c_mgr_connector.py index 1ec0311..0d96351 100644 --- a/vllm_ascend/distributed/llmdatadist_c_mgr_connector.py +++ b/vllm_ascend/distributed/llmdatadist_c_mgr_connector.py @@ -30,6 +30,7 @@ from vllm.v1.core.sched.output import SchedulerOutput from vllm.v1.request import Request, RequestStatus import vllm_ascend.envs as envs_ascend +from vllm_ascend.distributed.utils import get_transfer_timeout_value from vllm_ascend.utils import AscendSocVersion, get_ascend_soc_version TORCH_DTYPE_TO_NPU_DTYPE = { @@ -411,7 +412,7 @@ class LLMDataDistCMgrConnectorWorker(): assert self.local_agent_metadata is not None llm_config = LLMConfig() llm_config.device_id = self.local_rank - llm_config.sync_kv_timeout = 20000 + llm_config.sync_kv_timeout = get_transfer_timeout_value() llm_config.enable_switch_role = True llm_config.enable_cache_manager = True llm_config.enable_remote_cache_accessible = True diff --git a/vllm_ascend/distributed/mooncake_connector.py b/vllm_ascend/distributed/mooncake_connector.py index ca36581..4e6fca1 100644 --- a/vllm_ascend/distributed/mooncake_connector.py +++ b/vllm_ascend/distributed/mooncake_connector.py @@ -2,6 +2,7 @@ import contextlib import hashlib import math +import os import queue import random import struct @@ -33,6 +34,7 @@ from vllm.v1.request import RequestStatus import vllm_ascend.envs as envs_ascend from vllm_ascend.ascend_config import get_ascend_config, init_ascend_config from vllm_ascend.distributed.mooncake.transfer_engine import get_global_te +from vllm_ascend.distributed.utils import get_transfer_timeout_value if TYPE_CHECKING: from vllm.attention.backends.abstract import AttentionMetadata @@ -849,6 +851,8 @@ class MooncakeConnectorWorker: def __init__(self, vllm_config: VllmConfig, engine_id: str): self._get_prefill_decode_size(vllm_config) + os.environ["ASCEND_TRANSFER_TIMEOUT"] = str( + get_transfer_timeout_value()) if self._prefill_tp_size < self._decode_tp_size: raise ValueError( f"prefill_tp_size: {self._prefill_tp_size} must be greater than" diff --git a/vllm_ascend/distributed/mooncake_layerwise_connector.py b/vllm_ascend/distributed/mooncake_layerwise_connector.py index e500f53..79d9bdb 100644 --- a/vllm_ascend/distributed/mooncake_layerwise_connector.py +++ b/vllm_ascend/distributed/mooncake_layerwise_connector.py @@ -3,6 +3,7 @@ import contextlib import copy import hashlib import math +import os import queue import struct import threading @@ -31,6 +32,7 @@ from vllm.v1.core.sched.output import SchedulerOutput import vllm_ascend.envs as envs_ascend from vllm_ascend.ascend_config import get_ascend_config from vllm_ascend.distributed.utils import (align_memory, + get_transfer_timeout_value, kv_alltoall_and_rearrange) if TYPE_CHECKING: @@ -596,6 +598,8 @@ class MooncakeLayerwiseConnectorWorker: def __init__(self, vllm_config: VllmConfig, engine_id: str): self._get_prefill_decode_size(vllm_config) + os.environ["ASCEND_TRANSFER_TIMEOUT"] = str( + get_transfer_timeout_value()) if self._prefill_tp_size < self._decode_tp_size: raise ValueError( f"prefill_tp_size: {self._prefill_tp_size} must be greater than" diff --git a/vllm_ascend/distributed/utils.py b/vllm_ascend/distributed/utils.py index 4b1344a..c25c1f1 100644 --- a/vllm_ascend/distributed/utils.py +++ b/vllm_ascend/distributed/utils.py @@ -1,3 +1,5 @@ +import os + import torch import torch.distributed as dist @@ -45,3 +47,15 @@ def align_memory(tensor: torch.Tensor, alignment: int) -> torch.Tensor: aligned_addr = (data_ptr + alignment - 1) // alignment * alignment offset = (aligned_addr - data_ptr) // tensor.element_size() return tensor[int(offset):] + + +def get_transfer_timeout_value(): + ascend_transfer_timeout = os.getenv("ASCEND_TRANSFER_TIMEOUT", "") + if len(ascend_transfer_timeout) > 0: + return int(ascend_transfer_timeout) + hccl_rdma_timeout = int(os.getenv('HCCL_RDMA_TIMEOUT', + '20')) # type: ignore + hccl_rdma_retry_cnt = int(os.getenv('HCCL_RDMA_RETRY_CNT', + '7')) # type: ignore + return int((4.096 * (2**hccl_rdma_timeout)) * hccl_rdma_retry_cnt // 1000 + + 3000)