From 295018ec0f1bf940e7a89979145780629533bd48 Mon Sep 17 00:00:00 2001 From: lty Date: Thu, 15 Jan 2026 08:57:40 +0800 Subject: [PATCH] [Refactor]Refactor of vllm_ascend/distributed module (#5719) ### What this PR does / why we need it? Based on the RFC:https://github.com/vllm-project/vllm-ascend/issues/5604 This PR is a refactoring of vllm_ascend/distributed, moving all kv_transfer realtaed codes into a dedicated folder, which has already been done in vLLM ### Does this PR introduce _any_ user-facing change? NA ### How was this patch tested? - vLLM version: v0.13.0 - vLLM main: https://github.com/vllm-project/vllm/commit/2f4e6548efec402b913ffddc8726230d9311948d --------- Signed-off-by: lty --- .../contribution/multi_node_test.md | 2 +- docs/source/tutorials/DeepSeek-V3.1.md | 4 - docs/source/tutorials/DeepSeek-V3.2.md | 4 - docs/source/tutorials/Qwen3-235B-A22B.md | 3 - ...ng_sequence_context_parallel_multi_node.md | 3 - .../pd_disaggregation_mooncake_multi_node.md | 8 - .../pd_disaggregation_mooncake_single_node.md | 2 - .../deployment_guide/using_volcano_kthena.md | 4 +- .../feature_guide/large_scale_ep.md | 12 +- .../mooncake_connector_deployment_guide.md | 2 - examples/offline_disaggregated_prefill_npu.py | 2 - .../config/DeepSeek-R1-W8A8-EPLB.yaml | 4 - .../config/DeepSeek-R1-W8A8-longseq.yaml | 2 - .../multi_node/config/DeepSeek-R1-W8A8.yaml | 4 - .../multi_node/config/DeepSeek-V3.yaml | 2 - .../config/Qwen3-235B-W8A8-EPLB.yaml | 2 - .../config/Qwen3-235B-W8A8-longseq.yaml | 2 - .../multi_node/config/Qwen3-235B-W8A8.yaml | 2 - .../config/Qwen3-235B-disagg-pd.yaml | 2 - .../config/Qwen3-VL-235B-disagg-pd.yaml | 2 - .../distributed/mooncake/test_config_data.py | 2 +- tests/ut/distributed/test_communicator.py | 3 +- .../kv_connector/test_mooncake_connector.py | 145 +++++++++++------- .../test_mooncake_layerwise_connector.py | 101 +++++++----- tests/ut/kv_connector/utils.py | 3 +- tests/ut/test_platform.py | 2 +- vllm_ascend/__init__.py | 2 +- vllm_ascend/distributed/__init__.py | 44 ------ .../npu_communicator.py} | 0 .../distributed/kv_transfer/__init__.py | 45 ++++++ .../kv_p2p}/__init__.py | 0 .../kv_p2p}/mooncake_connector.py | 4 +- .../kv_p2p}/mooncake_layerwise_connector.py | 11 +- .../kv_transfer/kv_pool/__init__.py | 0 .../kv_pool/ascend_store/__init__.py | 0 .../ascend_store}/ascend_store_connector.py | 5 +- .../kv_pool/ascend_store/backend}/__init__.py | 0 .../kv_pool/ascend_store}/backend/backend.py | 0 .../ascend_store}/backend/memcache_backend.py | 3 +- .../ascend_store}/backend/mooncake_backend.py | 6 +- .../kv_pool/ascend_store}/config_data.py | 0 .../kv_pool/ascend_store}/kv_transfer.py | 5 +- .../kv_pool/ascend_store}/metadata.py | 2 +- .../kv_pool/ascend_store}/pool_scheduler.py | 2 +- .../kv_pool/ascend_store}/pool_worker.py | 11 +- .../kv_pool/cpu_offload/__init__.py | 0 .../cpu_offload}/cpu_kv_cache_manager.py | 0 .../cpu_offload}/cpu_offload_connector.py | 2 +- .../{ => kv_transfer}/ucm_connector.py | 0 .../distributed/kv_transfer/utils/__init__.py | 0 .../utils}/mooncake_transfer_engine.py | 0 .../distributed/kv_transfer/utils/utils.py | 61 ++++++++ .../distributed/kvpool/backend/__init__.py | 1 - vllm_ascend/distributed/utils.py | 60 +------- vllm_ascend/patch/worker/patch_distributed.py | 3 +- vllm_ascend/platform.py | 2 +- 56 files changed, 300 insertions(+), 293 deletions(-) rename vllm_ascend/distributed/{communicator.py => device_communicators/npu_communicator.py} (100%) create mode 100644 vllm_ascend/distributed/kv_transfer/__init__.py rename vllm_ascend/distributed/{cpu_offload_manager => kv_transfer/kv_p2p}/__init__.py (100%) rename vllm_ascend/distributed/{ => kv_transfer/kv_p2p}/mooncake_connector.py (99%) rename vllm_ascend/distributed/{ => kv_transfer/kv_p2p}/mooncake_layerwise_connector.py (99%) create mode 100644 vllm_ascend/distributed/kv_transfer/kv_pool/__init__.py create mode 100644 vllm_ascend/distributed/kv_transfer/kv_pool/ascend_store/__init__.py rename vllm_ascend/distributed/{kvpool => kv_transfer/kv_pool/ascend_store}/ascend_store_connector.py (97%) rename vllm_ascend/distributed/{kvpool => kv_transfer/kv_pool/ascend_store/backend}/__init__.py (100%) rename vllm_ascend/distributed/{kvpool => kv_transfer/kv_pool/ascend_store}/backend/backend.py (100%) rename vllm_ascend/distributed/{kvpool => kv_transfer/kv_pool/ascend_store}/backend/memcache_backend.py (97%) rename vllm_ascend/distributed/{kvpool => kv_transfer/kv_pool/ascend_store}/backend/mooncake_backend.py (97%) rename vllm_ascend/distributed/{kvpool => kv_transfer/kv_pool/ascend_store}/config_data.py (100%) rename vllm_ascend/distributed/{kvpool => kv_transfer/kv_pool/ascend_store}/kv_transfer.py (98%) rename vllm_ascend/distributed/{cpu_offload_manager => kv_transfer/kv_pool/ascend_store}/metadata.py (99%) rename vllm_ascend/distributed/{kvpool => kv_transfer/kv_pool/ascend_store}/pool_scheduler.py (99%) rename vllm_ascend/distributed/{kvpool => kv_transfer/kv_pool/ascend_store}/pool_worker.py (98%) create mode 100644 vllm_ascend/distributed/kv_transfer/kv_pool/cpu_offload/__init__.py rename vllm_ascend/distributed/{cpu_offload_manager => kv_transfer/kv_pool/cpu_offload}/cpu_kv_cache_manager.py (100%) rename vllm_ascend/distributed/{ => kv_transfer/kv_pool/cpu_offload}/cpu_offload_connector.py (99%) rename vllm_ascend/distributed/{ => kv_transfer}/ucm_connector.py (100%) create mode 100644 vllm_ascend/distributed/kv_transfer/utils/__init__.py rename vllm_ascend/distributed/{ => kv_transfer/utils}/mooncake_transfer_engine.py (100%) create mode 100644 vllm_ascend/distributed/kv_transfer/utils/utils.py delete mode 100644 vllm_ascend/distributed/kvpool/backend/__init__.py diff --git a/docs/source/developer_guide/contribution/multi_node_test.md b/docs/source/developer_guide/contribution/multi_node_test.md index 43cfb408..fd972c51 100644 --- a/docs/source/developer_guide/contribution/multi_node_test.md +++ b/docs/source/developer_guide/contribution/multi_node_test.md @@ -295,7 +295,7 @@ This section assumes that you already have a [Kubernetes](https://kubernetes.io/ [2025-12-30 11:01:01] INFO multi_node_config.py:348: Resolving cluster IPs via DNS... [2025-12-30 11:01:01] INFO multi_node_config.py:212: Node 0 envs: {'VLLM_USE_MODELSCOPE': 'True', 'OMP_PROC_BIND': 'False', 'OMP_NUM_THREADS': '100', 'HCCL_BUFFSIZE': '1024', 'SERVER_PORT': '8080', 'NUMEXPR_MAX_THREADS': '128', 'DISAGGREGATED_PREFILL_PROXY_SCRIPT': 'examples/disaggregated_prefill_v1/load_balance_proxy_server_example.py', 'HCCL_IF_IP': '10.0.0.102', 'HCCL_SOCKET_IFNAME': 'eth0', 'GLOO_SOCKET_IFNAME': 'eth0', 'TP_SOCKET_IFNAME': 'eth0', 'LOCAL_IP': '10.0.0.102', 'NIC_NAME': 'eth0', 'MASTER_IP': '10.0.0.102'} [2025-12-30 11:01:01] INFO multi_node_config.py:159: Launching proxy: python examples/disaggregated_prefill_v1/load_balance_proxy_server_example.py --host 10.0.0.102 --port 6000 --prefiller-hosts 10.0.0.102 --prefiller-ports 8080 --decoder-hosts 10.0.0.138 --decoder-ports 8080 - [2025-12-30 11:01:01] INFO conftest.py:107: Starting server with command: vllm serve vllm-ascend/DeepSeek-V3-W8A8 --host 0.0.0.0 --port 8080 --data-parallel-size 2 --data-parallel-size-local 2 --tensor-parallel-size 8 --seed 1024 --enforce-eager --enable-expert-parallel --max-num-seqs 16 --max-model-len 8192 --max-num-batched-tokens 8192 --quantization ascend --trust-remote-code --no-enable-prefix-caching --gpu-memory-utilization 0.9 --kv-transfer-config {"kv_connector": "MooncakeConnectorV1", "kv_role": "kv_producer", "kv_port": "30000", "engine_id": "0", "kv_connector_module_path": "vllm_ascend.distributed.mooncake_connector", "kv_connector_extra_config": { + [2025-12-30 11:01:01] INFO conftest.py:107: Starting server with command: vllm serve vllm-ascend/DeepSeek-V3-W8A8 --host 0.0.0.0 --port 8080 --data-parallel-size 2 --data-parallel-size-local 2 --tensor-parallel-size 8 --seed 1024 --enforce-eager --enable-expert-parallel --max-num-seqs 16 --max-model-len 8192 --max-num-batched-tokens 8192 --quantization ascend --trust-remote-code --no-enable-prefix-caching --gpu-memory-utilization 0.9 --kv-transfer-config {"kv_connector": "MooncakeConnectorV1", "kv_role": "kv_producer", "kv_port": "30000", "engine_id": "0", "kv_connector_extra_config": { "prefill": { "dp_size": 2, "tp_size": 8 diff --git a/docs/source/tutorials/DeepSeek-V3.1.md b/docs/source/tutorials/DeepSeek-V3.1.md index 274de110..7ecb55d0 100644 --- a/docs/source/tutorials/DeepSeek-V3.1.md +++ b/docs/source/tutorials/DeepSeek-V3.1.md @@ -326,7 +326,6 @@ vllm serve /weights/DeepSeek-V3.1-w8a8-mtp-QuaRot \ "kv_role": "kv_producer", "kv_port": "30000", "engine_id": "0", - "kv_connector_module_path": "vllm_ascend.distributed.mooncake_connector", "kv_connector_extra_config": { "prefill": { "dp_size": 2, @@ -402,7 +401,6 @@ vllm serve /weights/DeepSeek-V3.1-w8a8-mtp-QuaRot \ "kv_role": "kv_producer", "kv_port": "30100", "engine_id": "1", - "kv_connector_module_path": "vllm_ascend.distributed.mooncake_connector", "kv_connector_extra_config": { "prefill": { "dp_size": 2, @@ -480,7 +478,6 @@ vllm serve /weights/DeepSeek-V3.1-w8a8-mtp-QuaRot \ "kv_role": "kv_consumer", "kv_port": "30200", "engine_id": "2", - "kv_connector_module_path": "vllm_ascend.distributed.mooncake_connector", "kv_connector_extra_config": { "prefill": { "dp_size": 2, @@ -558,7 +555,6 @@ vllm serve /weights/DeepSeek-V3.1-w8a8-mtp-QuaRot \ "kv_role": "kv_consumer", "kv_port": "30300", "engine_id": "3", - "kv_connector_module_path": "vllm_ascend.distributed.mooncake_connector", "kv_connector_extra_config": { "prefill": { "dp_size": 2, diff --git a/docs/source/tutorials/DeepSeek-V3.2.md b/docs/source/tutorials/DeepSeek-V3.2.md index e81e6bb8..54150709 100644 --- a/docs/source/tutorials/DeepSeek-V3.2.md +++ b/docs/source/tutorials/DeepSeek-V3.2.md @@ -316,7 +316,6 @@ Before you start, please "kv_role": "kv_producer", "kv_port": "30000", "engine_id": "0", - "kv_connector_module_path": "vllm_ascend.distributed.mooncake_connector", "kv_connector_extra_config": { "use_ascend_direct": true, "prefill": { @@ -391,7 +390,6 @@ Before you start, please "kv_role": "kv_producer", "kv_port": "30000", "engine_id": "0", - "kv_connector_module_path": "vllm_ascend.distributed.mooncake_connector", "kv_connector_extra_config": { "use_ascend_direct": true, "prefill": { @@ -469,7 +467,6 @@ Before you start, please "kv_role": "kv_consumer", "kv_port": "30100", "engine_id": "1", - "kv_connector_module_path": "vllm_ascend.distributed.mooncake_connector", "kv_connector_extra_config": { "use_ascend_direct": true, "prefill": { @@ -548,7 +545,6 @@ Before you start, please "kv_role": "kv_consumer", "kv_port": "30100", "engine_id": "1", - "kv_connector_module_path": "vllm_ascend.distributed.mooncake_connector", "kv_connector_extra_config": { "use_ascend_direct": true, "prefill": { diff --git a/docs/source/tutorials/Qwen3-235B-A22B.md b/docs/source/tutorials/Qwen3-235B-A22B.md index 9b534c16..c0e3f746 100644 --- a/docs/source/tutorials/Qwen3-235B-A22B.md +++ b/docs/source/tutorials/Qwen3-235B-A22B.md @@ -450,7 +450,6 @@ vllm serve vllm-ascend/Qwen3-235B-A22B-w8a8 \ "kv_role": "kv_producer", "kv_port": "30000", "engine_id": "0", -"kv_connector_module_path": "vllm_ascend.distributed.mooncake_connector", "kv_connector_extra_config": { "use_ascend_direct": true, "prefill": { @@ -516,7 +515,6 @@ vllm serve vllm-ascend/Qwen3-235B-A22B-w8a8 \ "kv_role": "kv_consumer", "kv_port": "30100", "engine_id": "1", -"kv_connector_module_path": "vllm_ascend.distributed.mooncake_connector", "kv_connector_extra_config": { "use_ascend_direct": true, "prefill": { @@ -583,7 +581,6 @@ vllm serve vllm-ascend/Qwen3-235B-A22B-w8a8 \ "kv_role": "kv_consumer", "kv_port": "30100", "engine_id": "1", -"kv_connector_module_path": "vllm_ascend.distributed.mooncake_connector", "kv_connector_extra_config": { "use_ascend_direct": true, "prefill": { diff --git a/docs/source/tutorials/long_sequence_context_parallel_multi_node.md b/docs/source/tutorials/long_sequence_context_parallel_multi_node.md index 375704d5..c51b3592 100644 --- a/docs/source/tutorials/long_sequence_context_parallel_multi_node.md +++ b/docs/source/tutorials/long_sequence_context_parallel_multi_node.md @@ -123,7 +123,6 @@ vllm serve /path_to_weight/DeepSeek-V3.1_w8a8mix_mtp \ "kv_role": "kv_producer", "kv_port": "30000", "engine_id": "0", - "kv_connector_module_path": "vllm_ascend.distributed.mooncake_connector", "kv_connector_extra_config": { "use_ascend_direct": true, "prefill": { @@ -192,7 +191,6 @@ vllm serve /path_to_weight/DeepSeek-V3.1_w8a8mix_mtp \ "kv_role": "kv_producer", "kv_port": "30000", "engine_id": "1", - "kv_connector_module_path": "vllm_ascend.distributed.mooncake_connector", "kv_connector_extra_config": { "use_ascend_direct": true, "prefill": { @@ -259,7 +257,6 @@ vllm serve /path_to_weight/DeepSeek-V3.1_w8a8mix_mtp \ "kv_role": "kv_consumer", "kv_port": "30200", "engine_id": "3", - "kv_connector_module_path": "vllm_ascend.distributed.mooncake_connector", "kv_connector_extra_config": { "prefill": { "dp_size": 1, diff --git a/docs/source/tutorials/pd_disaggregation_mooncake_multi_node.md b/docs/source/tutorials/pd_disaggregation_mooncake_multi_node.md index a5a060c6..a338c129 100644 --- a/docs/source/tutorials/pd_disaggregation_mooncake_multi_node.md +++ b/docs/source/tutorials/pd_disaggregation_mooncake_multi_node.md @@ -280,7 +280,6 @@ vllm serve /path_to_weight/DeepSeek-r1_w8a8_mtp \ "kv_role": "kv_producer", "kv_port": "30000", "engine_id": "0", - "kv_connector_module_path": "vllm_ascend.distributed.mooncake_layerwise_connector", "kv_connector_extra_config": { "prefill": { "dp_size": 2, @@ -340,7 +339,6 @@ vllm serve /path_to_weight/DeepSeek-r1_w8a8_mtp \ "kv_role": "kv_producer", "kv_port": "30100", "engine_id": "1", - "kv_connector_module_path": "vllm_ascend.distributed.mooncake_layerwise_connector", "kv_connector_extra_config": { "prefill": { "dp_size": 2, @@ -401,7 +399,6 @@ vllm serve /path_to_weight/DeepSeek-r1_w8a8_mtp \ "kv_role": "kv_consumer", "kv_port": "30200", "engine_id": "2", - "kv_connector_module_path": "vllm_ascend.distributed.mooncake_layerwise_connector", "kv_connector_extra_config": { "prefill": { "dp_size": 2, @@ -461,7 +458,6 @@ vllm serve /path_to_weight/DeepSeek-r1_w8a8_mtp \ "kv_role": "kv_consumer", "kv_port": "30200", "engine_id": "2", - "kv_connector_module_path": "vllm_ascend.distributed.mooncake_layerwise_connector", "kv_connector_extra_config": { "prefill": { @@ -529,7 +525,6 @@ vllm serve /path_to_weight/DeepSeek-r1_w8a8_mtp \ "kv_role": "kv_producer", "kv_port": "30000", "engine_id": "0", - "kv_connector_module_path": "vllm_ascend.distributed.mooncake_connector", "kv_connector_extra_config": { "prefill": { "dp_size": 2, @@ -589,7 +584,6 @@ vllm serve /path_to_weight/DeepSeek-r1_w8a8_mtp \ "kv_role": "kv_producer", "kv_port": "30100", "engine_id": "1", - "kv_connector_module_path": "vllm_ascend.distributed.mooncake_connector", "kv_connector_extra_config": { "prefill": { "dp_size": 2, @@ -650,7 +644,6 @@ vllm serve /path_to_weight/DeepSeek-r1_w8a8_mtp \ "kv_role": "kv_consumer", "kv_port": "30200", "engine_id": "2", - "kv_connector_module_path": "vllm_ascend.distributed.mooncake_connector", "kv_connector_extra_config": { "prefill": { "dp_size": 2, @@ -710,7 +703,6 @@ vllm serve /path_to_weight/DeepSeek-r1_w8a8_mtp \ "kv_role": "kv_consumer", "kv_port": "30200", "engine_id": "2", - "kv_connector_module_path": "vllm_ascend.distributed.mooncake_connector", "kv_connector_extra_config": { "prefill": { "dp_size": 2, diff --git a/docs/source/tutorials/pd_disaggregation_mooncake_single_node.md b/docs/source/tutorials/pd_disaggregation_mooncake_single_node.md index a7a16feb..dab481fa 100644 --- a/docs/source/tutorials/pd_disaggregation_mooncake_single_node.md +++ b/docs/source/tutorials/pd_disaggregation_mooncake_single_node.md @@ -173,7 +173,6 @@ vllm serve /model/Qwen2.5-VL-7B-Instruct \ "kv_role": "kv_producer", "kv_port": "30000", "engine_id": "0", - "kv_connector_module_path": "vllm_ascend.distributed.mooncake_connector", "kv_connector_extra_config": { "prefill": { "dp_size": 1, @@ -216,7 +215,6 @@ vllm serve /model/Qwen2.5-VL-7B-Instruct \ "kv_role": "kv_consumer", "kv_port": "30100", "engine_id": "1", - "kv_connector_module_path": "vllm_ascend.distributed.mooncake_connector", "kv_connector_extra_config": { "prefill": { "dp_size": 1, diff --git a/docs/source/user_guide/deployment_guide/using_volcano_kthena.md b/docs/source/user_guide/deployment_guide/using_volcano_kthena.md index a5aa29d2..1c1e95f5 100644 --- a/docs/source/user_guide/deployment_guide/using_volcano_kthena.md +++ b/docs/source/user_guide/deployment_guide/using_volcano_kthena.md @@ -137,7 +137,7 @@ spec: - "--trust-remote-code" - "--enforce-eager" - "--kv-transfer-config" - - '{"kv_connector":"MooncakeConnectorV1","kv_buffer_device":"npu","kv_role":"kv_producer","kv_parallel_size":1,"kv_port":"20001","engine_id":"0","kv_rank":0,"kv_connector_module_path":"vllm_ascend.distributed.mooncake_connector","kv_connector_extra_config":{"prefill":{"dp_size":2,"tp_size":2},"decode":{"dp_size":2,"tp_size":2}}}' + - '{"kv_connector":"MooncakeConnectorV1","kv_buffer_device":"npu","kv_role":"kv_producer","kv_parallel_size":1,"kv_port":"20001","engine_id":"0","kv_rank":0,"kv_connector_extra_config":{"prefill":{"dp_size":2,"tp_size":2},"decode":{"dp_size":2,"tp_size":2}}}' imagePullPolicy: Always resources: limits: @@ -240,7 +240,7 @@ spec: - "--no-enable-prefix-caching" - "--enforce-eager" - "--kv-transfer-config" - - '{"kv_connector":"MooncakeConnectorV1","kv_buffer_device":"npu","kv_role":"kv_consumer","kv_parallel_size":1,"kv_port":"20002","engine_id":"1","kv_rank":1,"kv_connector_module_path":"vllm_ascend.distributed.mooncake_connector","kv_connector_extra_config":{"prefill":{"dp_size":2,"tp_size":2},"decode":{"dp_size":2,"tp_size":2}}}' + - '{"kv_connector":"MooncakeConnectorV1","kv_buffer_device":"npu","kv_role":"kv_consumer","kv_parallel_size":1,"kv_port":"20002","engine_id":"1","kv_rank":1,"kv_connector_extra_config":{"prefill":{"dp_size":2,"tp_size":2},"decode":{"dp_size":2,"tp_size":2}}}' imagePullPolicy: Always resources: limits: diff --git a/docs/source/user_guide/feature_guide/large_scale_ep.md b/docs/source/user_guide/feature_guide/large_scale_ep.md index 01a52434..b0247394 100644 --- a/docs/source/user_guide/feature_guide/large_scale_ep.md +++ b/docs/source/user_guide/feature_guide/large_scale_ep.md @@ -163,8 +163,7 @@ vllm serve vllm-ascend/DeepSeek-R1-W8A8 \ "kv_role": "kv_producer", "kv_parallel_size": "1", "kv_port": "20001", - "engine_id": "0", - "kv_connector_module_path": "vllm_ascend.distributed.mooncake_connector" + "engine_id": "0" }' --additional-config '{"enable_weight_nz_layout":true,"enable_prefill_optimizations":true}' ``` @@ -230,8 +229,7 @@ vllm serve vllm-ascend/DeepSeek-R1-W8A8 \ "kv_role": "kv_consumer", "kv_parallel_size": "1", "kv_port": "20001", - "engine_id": "0", - "kv_connector_module_path": "vllm_ascend.distributed.mooncake_connector" + "engine_id": "0" }' \ --additional-config '{"enable_weight_nz_layout":true}' ``` @@ -435,8 +433,7 @@ In the PD separation scenario, we provide a optimized configuration. "kv_role": "kv_producer", "kv_parallel_size": "1", "kv_port": "20001", - "engine_id": "0", - "kv_connector_module_path": "vllm_ascend.distributed.mooncake_connector" + "engine_id": "0" }' ``` @@ -458,8 +455,7 @@ In the PD separation scenario, we provide a optimized configuration. "kv_role": "kv_consumer", "kv_parallel_size": "1", "kv_port": "20001", - "engine_id": "0", - "kv_connector_module_path": "vllm_ascend.distributed.mooncake_connector" + "engine_id": "0" }' ``` diff --git a/examples/disaggregated_prefill_v1/mooncake_connector_deployment_guide.md b/examples/disaggregated_prefill_v1/mooncake_connector_deployment_guide.md index e9c35ac5..19b70791 100644 --- a/examples/disaggregated_prefill_v1/mooncake_connector_deployment_guide.md +++ b/examples/disaggregated_prefill_v1/mooncake_connector_deployment_guide.md @@ -55,7 +55,6 @@ vllm serve "/xxxxx/DeepSeek-V2-Lite-Chat" \ "kv_port": "20001", "engine_id": "0", "kv_rank": 0, - "kv_connector_module_path": "vllm_ascend.distributed.mooncake_connector", "kv_connector_extra_config": { "prefill": { "dp_size": 2, @@ -123,7 +122,6 @@ vllm serve "/xxxxx/DeepSeek-V2-Lite-Chat" \ "kv_port": "20002", "engine_id": "1", "kv_rank": 1, - "kv_connector_module_path": "vllm_ascend.distributed.mooncake_connector", "kv_connector_extra_config": { "prefill": { "dp_size": 2, diff --git a/examples/offline_disaggregated_prefill_npu.py b/examples/offline_disaggregated_prefill_npu.py index 142c1c79..060d9021 100644 --- a/examples/offline_disaggregated_prefill_npu.py +++ b/examples/offline_disaggregated_prefill_npu.py @@ -56,7 +56,6 @@ def run_prefill(prefill_done, process_close): kv_role="kv_producer", kv_port="30000", engine_id="0", - kv_connector_module_path="vllm_ascend.distributed.mooncake_connector", kv_connector_extra_config={"prefill": {"dp_size": 1, "tp_size": 1}, "decode": {"dp_size": 1, "tp_size": 1}}, ) # Set NPU memory utilization to 0.8 @@ -104,7 +103,6 @@ def run_decode(prefill_done): kv_role="kv_consumer", kv_port="30100", engine_id="1", - kv_connector_module_path="vllm_ascend.distributed.mooncake_connector", kv_connector_extra_config={"prefill": {"dp_size": 1, "tp_size": 1}, "decode": {"dp_size": 1, "tp_size": 1}}, ) diff --git a/tests/e2e/nightly/multi_node/config/DeepSeek-R1-W8A8-EPLB.yaml b/tests/e2e/nightly/multi_node/config/DeepSeek-R1-W8A8-EPLB.yaml index 37195ad5..8eae6d2e 100644 --- a/tests/e2e/nightly/multi_node/config/DeepSeek-R1-W8A8-EPLB.yaml +++ b/tests/e2e/nightly/multi_node/config/DeepSeek-R1-W8A8-EPLB.yaml @@ -43,7 +43,6 @@ deployment: "kv_role": "kv_producer", "kv_port": "30000", "engine_id": "0", - "kv_connector_module_path": "vllm_ascend.distributed.mooncake_connector", "kv_connector_extra_config": { "prefill": { "dp_size": 2, @@ -81,7 +80,6 @@ deployment: "kv_role": "kv_producer", "kv_port": "30100", "engine_id": "1", - "kv_connector_module_path": "vllm_ascend.distributed.mooncake_connector", "kv_connector_extra_config": { "prefill": { "dp_size": 2, @@ -120,7 +118,6 @@ deployment: "kv_role": "kv_consumer", "kv_port": "30200", "engine_id": "2", - "kv_connector_module_path": "vllm_ascend.distributed.mooncake_connector", "kv_connector_extra_config": { "prefill": { "dp_size": 2, @@ -158,7 +155,6 @@ deployment: "kv_role": "kv_consumer", "kv_port": "30200", "engine_id": "2", - "kv_connector_module_path": "vllm_ascend.distributed.mooncake_connector", "kv_connector_extra_config": { "prefill": { "dp_size": 2, diff --git a/tests/e2e/nightly/multi_node/config/DeepSeek-R1-W8A8-longseq.yaml b/tests/e2e/nightly/multi_node/config/DeepSeek-R1-W8A8-longseq.yaml index 8b91daa1..5cbc108c 100644 --- a/tests/e2e/nightly/multi_node/config/DeepSeek-R1-W8A8-longseq.yaml +++ b/tests/e2e/nightly/multi_node/config/DeepSeek-R1-W8A8-longseq.yaml @@ -46,7 +46,6 @@ deployment: "kv_role": "kv_producer", "kv_port": "30000", "engine_id": "0", - "kv_connector_module_path": "vllm_ascend.distributed.mooncake_connector", "kv_connector_extra_config": { "prefill": { "dp_size": 1, @@ -85,7 +84,6 @@ deployment: "kv_role": "kv_consumer", "kv_port": "30100", "engine_id": "1", - "kv_connector_module_path": "vllm_ascend.distributed.mooncake_connector", "kv_connector_extra_config": { "prefill": { "dp_size": 1, diff --git a/tests/e2e/nightly/multi_node/config/DeepSeek-R1-W8A8.yaml b/tests/e2e/nightly/multi_node/config/DeepSeek-R1-W8A8.yaml index 660120f3..50f5831c 100644 --- a/tests/e2e/nightly/multi_node/config/DeepSeek-R1-W8A8.yaml +++ b/tests/e2e/nightly/multi_node/config/DeepSeek-R1-W8A8.yaml @@ -44,7 +44,6 @@ deployment: "kv_role": "kv_producer", "kv_port": "30000", "engine_id": "0", - "kv_connector_module_path": "vllm_ascend.distributed.mooncake_connector", "kv_connector_extra_config": { "prefill": { "dp_size": 2, @@ -83,7 +82,6 @@ deployment: "kv_role": "kv_producer", "kv_port": "30100", "engine_id": "1", - "kv_connector_module_path": "vllm_ascend.distributed.mooncake_connector", "kv_connector_extra_config": { "prefill": { "dp_size": 2, @@ -124,7 +122,6 @@ deployment: "kv_role": "kv_consumer", "kv_port": "30200", "engine_id": "2", - "kv_connector_module_path": "vllm_ascend.distributed.mooncake_connector", "kv_connector_extra_config": { "prefill": { "dp_size": 2, @@ -168,7 +165,6 @@ deployment: "kv_role": "kv_consumer", "kv_port": "30200", "engine_id": "2", - "kv_connector_module_path": "vllm_ascend.distributed.mooncake_connector", "kv_connector_extra_config": { "prefill": { "dp_size": 2, diff --git a/tests/e2e/nightly/multi_node/config/DeepSeek-V3.yaml b/tests/e2e/nightly/multi_node/config/DeepSeek-V3.yaml index f3fb8a24..244d1cce 100644 --- a/tests/e2e/nightly/multi_node/config/DeepSeek-V3.yaml +++ b/tests/e2e/nightly/multi_node/config/DeepSeek-V3.yaml @@ -53,7 +53,6 @@ deployment: "kv_role": "kv_producer", "kv_port": "30000", "engine_id": "0", - "kv_connector_module_path": "vllm_ascend.distributed.mooncake_connector", "kv_connector_extra_config": { "prefill": { "dp_size": 2, @@ -88,7 +87,6 @@ deployment: "kv_role": "kv_consumer", "kv_port": "30200", "engine_id": "1", - "kv_connector_module_path": "vllm_ascend.distributed.mooncake_connector", "kv_connector_extra_config": { "prefill": { "dp_size": 2, diff --git a/tests/e2e/nightly/multi_node/config/Qwen3-235B-W8A8-EPLB.yaml b/tests/e2e/nightly/multi_node/config/Qwen3-235B-W8A8-EPLB.yaml index 6abdd0b5..a8df4407 100644 --- a/tests/e2e/nightly/multi_node/config/Qwen3-235B-W8A8-EPLB.yaml +++ b/tests/e2e/nightly/multi_node/config/Qwen3-235B-W8A8-EPLB.yaml @@ -39,7 +39,6 @@ deployment: "kv_role": "kv_producer", "kv_port": "30000", "engine_id": "0", - "kv_connector_module_path": "vllm_ascend.distributed.mooncake_connector", "kv_connector_extra_config": { "prefill": { "dp_size": 2, @@ -76,7 +75,6 @@ deployment: "kv_role": "kv_consumer", "kv_port": "30200", "engine_id": "1", - "kv_connector_module_path": "vllm_ascend.distributed.mooncake_connector", "kv_connector_extra_config": { "prefill": { "dp_size": 2, diff --git a/tests/e2e/nightly/multi_node/config/Qwen3-235B-W8A8-longseq.yaml b/tests/e2e/nightly/multi_node/config/Qwen3-235B-W8A8-longseq.yaml index f4cacda4..0477a349 100644 --- a/tests/e2e/nightly/multi_node/config/Qwen3-235B-W8A8-longseq.yaml +++ b/tests/e2e/nightly/multi_node/config/Qwen3-235B-W8A8-longseq.yaml @@ -43,7 +43,6 @@ deployment: "kv_role": "kv_producer", "kv_port": "30000", "engine_id": "0", - "kv_connector_module_path": "vllm_ascend.distributed.mooncake_connector", "kv_connector_extra_config": { "prefill": { "dp_size": 1, @@ -83,7 +82,6 @@ deployment: "kv_role": "kv_consumer", "kv_port": "30100", "engine_id": "1", - "kv_connector_module_path": "vllm_ascend.distributed.mooncake_connector", "kv_connector_extra_config": { "prefill": { "dp_size": 1, diff --git a/tests/e2e/nightly/multi_node/config/Qwen3-235B-W8A8.yaml b/tests/e2e/nightly/multi_node/config/Qwen3-235B-W8A8.yaml index 3572dbbc..a5bdbef5 100644 --- a/tests/e2e/nightly/multi_node/config/Qwen3-235B-W8A8.yaml +++ b/tests/e2e/nightly/multi_node/config/Qwen3-235B-W8A8.yaml @@ -39,7 +39,6 @@ deployment: "kv_role": "kv_producer", "kv_port": "30000", "engine_id": "0", - "kv_connector_module_path": "vllm_ascend.distributed.mooncake_connector", "kv_connector_extra_config": { "prefill": { "dp_size": 2, @@ -74,7 +73,6 @@ deployment: "kv_role": "kv_consumer", "kv_port": "30200", "engine_id": "1", - "kv_connector_module_path": "vllm_ascend.distributed.mooncake_connector", "kv_connector_extra_config": { "prefill": { "dp_size": 2, diff --git a/tests/e2e/nightly/multi_node/config/Qwen3-235B-disagg-pd.yaml b/tests/e2e/nightly/multi_node/config/Qwen3-235B-disagg-pd.yaml index f5f63f94..4139a35d 100644 --- a/tests/e2e/nightly/multi_node/config/Qwen3-235B-disagg-pd.yaml +++ b/tests/e2e/nightly/multi_node/config/Qwen3-235B-disagg-pd.yaml @@ -44,7 +44,6 @@ deployment: "kv_role": "kv_producer", "kv_port": "30000", "engine_id": "0", - "kv_connector_module_path": "vllm_ascend.distributed.mooncake_connector", "kv_connector_extra_config": { "use_ascend_direct": true, "prefill": { @@ -84,7 +83,6 @@ deployment: "kv_role": "kv_consumer", "kv_port": "30100", "engine_id": "1", - "kv_connector_module_path": "vllm_ascend.distributed.mooncake_connector", "kv_connector_extra_config": { "use_ascend_direct": true, "prefill": { diff --git a/tests/e2e/nightly/multi_node/config/Qwen3-VL-235B-disagg-pd.yaml b/tests/e2e/nightly/multi_node/config/Qwen3-VL-235B-disagg-pd.yaml index 3c58065a..d3577c78 100644 --- a/tests/e2e/nightly/multi_node/config/Qwen3-VL-235B-disagg-pd.yaml +++ b/tests/e2e/nightly/multi_node/config/Qwen3-VL-235B-disagg-pd.yaml @@ -39,7 +39,6 @@ deployment: "kv_role": "kv_producer", "kv_port": "30000", "engine_id": "0", - "kv_connector_module_path": "vllm_ascend.distributed.mooncake_connector", "kv_connector_extra_config": { "prefill": { "dp_size": 2, @@ -73,7 +72,6 @@ deployment: "kv_role": "kv_consumer", "kv_port": "30200", "engine_id": "1", - "kv_connector_module_path": "vllm_ascend.distributed.mooncake_connector", "kv_connector_extra_config": { "prefill": { "dp_size": 2, diff --git a/tests/ut/distributed/mooncake/test_config_data.py b/tests/ut/distributed/mooncake/test_config_data.py index 57f09f07..ac13dad1 100644 --- a/tests/ut/distributed/mooncake/test_config_data.py +++ b/tests/ut/distributed/mooncake/test_config_data.py @@ -10,7 +10,7 @@ fake_store = types.ModuleType("mooncake.store") fake_store.ReplicateConfig = MagicMock() # type: ignore[attr-defined] sys.modules["mooncake.store"] = fake_store -from vllm_ascend.distributed.kvpool.backend.mooncake_backend import ( # noqa: E402 +from vllm_ascend.distributed.kv_transfer.kv_pool.ascend_store.backend.mooncake_backend import ( # noqa: E402 _convert_to_bytes, _parse_global_segment_size) diff --git a/tests/ut/distributed/test_communicator.py b/tests/ut/distributed/test_communicator.py index edaae2a1..c929741e 100644 --- a/tests/ut/distributed/test_communicator.py +++ b/tests/ut/distributed/test_communicator.py @@ -4,7 +4,8 @@ from unittest.mock import MagicMock, patch import torch import torch.distributed as dist -from vllm_ascend.distributed.communicator import NPUCommunicator +from vllm_ascend.distributed.device_communicators.npu_communicator import \ + NPUCommunicator class TestNPUCommunicator(unittest.TestCase): diff --git a/tests/ut/kv_connector/test_mooncake_connector.py b/tests/ut/kv_connector/test_mooncake_connector.py index b09174a9..8cd1e6af 100644 --- a/tests/ut/kv_connector/test_mooncake_connector.py +++ b/tests/ut/kv_connector/test_mooncake_connector.py @@ -23,21 +23,24 @@ _mock_pp_group = MagicMock(rank_in_group=0, world_size=1) _mock_tp_group = MagicMock(rank_in_group=0, world_size=4) _mock_pcp_group = MagicMock(rank_in_group=0, world_size=1) _mock_dcp_group = MagicMock(rank_in_group=0, world_size=1) -patch('vllm_ascend.distributed.mooncake_connector.get_pp_group', - return_value=_mock_pp_group).start() -patch('vllm_ascend.distributed.mooncake_connector.get_tp_group', - return_value=_mock_tp_group).start() patch( - 'vllm_ascend.distributed.mooncake_connector.get_tensor_model_parallel_world_size', + 'vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_connector.get_pp_group', + return_value=_mock_pp_group).start() +patch( + 'vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_connector.get_tp_group', + return_value=_mock_tp_group).start() +patch( + 'vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_connector.get_tensor_model_parallel_world_size', return_value=4).start() patch( - 'vllm_ascend.distributed.mooncake_connector.get_tensor_model_parallel_rank', + 'vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_connector.get_tensor_model_parallel_rank', return_value=0).start() -patch('vllm_ascend.distributed.mooncake_connector.get_pcp_group', - return_value=_mock_pcp_group).start() +patch( + 'vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_connector.get_pcp_group', + return_value=_mock_pcp_group).start() patch('vllm.distributed.parallel_state._DCP', _mock_dcp_group).start() -from vllm_ascend.distributed.mooncake_connector import ( # noqa: E402 +from vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_connector import ( # noqa: E402 KVCacheRecvingThread, KVCacheSendingThread, KVCacheTaskTracker, KVConnectorRole, MooncakeAgentMetadata, MooncakeConnector, MooncakeConnectorMetadata, MooncakeConnectorScheduler, @@ -81,7 +84,8 @@ class TestGetAndClearFinishedSingleRequests(unittest.TestCase): self.assertSetEqual(result, {"req_1", "req_2", "req_3"}) self.assertEqual(len(self.tracker.finished_requests), 0) - @patch("vllm_ascend.distributed.mooncake_connector.logger") + @patch( + "vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_connector.logger") def test_concurrent_access(self, mock_logger): from concurrent.futures import ThreadPoolExecutor self.tracker.finished_requests = {"req_1", "req_2"} @@ -307,8 +311,12 @@ class TestSocketManagement(unittest.TestCase): self.thread.remote_sockets = defaultdict(deque) self.thread.remote_poller = MagicMock() - @patch('vllm_ascend.distributed.mooncake_connector.zmq.Context') - @patch('vllm_ascend.distributed.mooncake_connector.make_zmq_socket') + @patch( + 'vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_connector.zmq.Context' + ) + @patch( + 'vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_connector.make_zmq_socket' + ) def test_get_remote_socket(self, mock_make_socket, mock_context): mock_sock = MagicMock() mock_make_socket.return_value = mock_sock @@ -402,7 +410,7 @@ class TestCoreFunctionality(unittest.TestCase): @patch.object(KVCacheRecvingThread, '_get_remote_metadata') def test_transfer_kv_cache(self, mock_get_meta): with patch( - 'vllm_ascend.distributed.mooncake_connector.get_ascend_config' + 'vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_connector.get_ascend_config' ) as mock_config: mock_config.return_value.enable_kv_nz = False self.thread.kv_caches_base_addr["remote_engine"] = { @@ -456,8 +464,12 @@ class TestMetadataHandling(unittest.TestCase): kv_caches_base_addr=[0x3000, 0x4000], num_blocks=2) - @patch('vllm_ascend.distributed.mooncake_connector.ensure_zmq_send') - @patch('vllm_ascend.distributed.mooncake_connector.ensure_zmq_recv') + @patch( + 'vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_connector.ensure_zmq_send' + ) + @patch( + 'vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_connector.ensure_zmq_recv' + ) def test_get_remote_metadata_success(self, mock_recv, mock_send): mock_recv.return_value = msgspec.msgpack.encode(self.test_metadata) @@ -479,9 +491,12 @@ class TestMetadataHandling(unittest.TestCase): self.thread.kv_caches_base_addr["remote_engine"][5555], [0x3000, 0x4000]) - @patch('vllm_ascend.distributed.mooncake_connector.ensure_zmq_send') - @patch('vllm_ascend.distributed.mooncake_connector.ensure_zmq_recv', - side_effect=Exception("Network error")) + @patch( + 'vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_connector.ensure_zmq_send' + ) + @patch( + 'vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_connector.ensure_zmq_recv', + side_effect=Exception("Network error")) def test_get_remote_metadata_failure(self, mock_recv, mock_send): with patch.object(self.thread, '_get_remote_socket') as mock_get_socket, \ patch.object(self.thread, '_return_remote_socket') as mock_return_socket: @@ -694,10 +709,10 @@ class TestMooncakeConnectorSchedulerMatchedTokens(unittest.TestCase): def setUp(self): config = MockVllmConfig() self.p1 = patch( - 'vllm_ascend.distributed.mooncake_connector.init_ascend_config', + 'vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_connector.init_ascend_config', new=MagicMock()) self.p2 = patch( - 'vllm_ascend.distributed.mooncake_connector.get_ascend_config', + 'vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_connector.get_ascend_config', new=MagicMock(return_value=MagicMock())) self.p1.start() self.p2.start() @@ -775,9 +790,9 @@ class TestMooncakeConnectorForScheduler(unittest.TestCase): def test_scheduler_role(self): config = MockVllmConfig() with patch( - 'vllm_ascend.distributed.mooncake_connector.init_ascend_config' + 'vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_connector.init_ascend_config' ), patch( - 'vllm_ascend.distributed.mooncake_connector.get_ascend_config', + 'vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_connector.get_ascend_config', return_value=MagicMock()): connector = MooncakeConnector(config, KVConnectorRole.SCHEDULER) self.assertIsNotNone(connector.connector_scheduler) @@ -787,9 +802,9 @@ class TestMooncakeConnectorForScheduler(unittest.TestCase): def test_scheduler_methods(self, mock_method): config = MockVllmConfig() with patch( - 'vllm_ascend.distributed.mooncake_connector.init_ascend_config' + 'vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_connector.init_ascend_config' ), patch( - 'vllm_ascend.distributed.mooncake_connector.get_ascend_config', + 'vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_connector.get_ascend_config', return_value=MagicMock()): connector = MooncakeConnector(config, KVConnectorRole.SCHEDULER) request = MockRequest("req1") @@ -819,9 +834,9 @@ class TestMooncakeConnector(unittest.TestCase): def test_scheduler_initialization(self): with patch( - 'vllm_ascend.distributed.mooncake_connector.init_ascend_config' + 'vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_connector.init_ascend_config' ), patch( - 'vllm_ascend.distributed.mooncake_connector.get_ascend_config', + 'vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_connector.get_ascend_config', return_value=MagicMock()): connector = MooncakeConnector(self.config, KVConnectorRole.SCHEDULER) @@ -831,9 +846,9 @@ class TestMooncakeConnector(unittest.TestCase): @patch.object(MooncakeConnectorScheduler, "get_num_new_matched_tokens") def test_get_num_new_matched_tokens(self, mock_method): with patch( - 'vllm_ascend.distributed.mooncake_connector.init_ascend_config' + 'vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_connector.init_ascend_config' ), patch( - 'vllm_ascend.distributed.mooncake_connector.get_ascend_config', + 'vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_connector.get_ascend_config', return_value=MagicMock()): connector = MooncakeConnector(self.config, KVConnectorRole.SCHEDULER) @@ -844,9 +859,9 @@ class TestMooncakeConnector(unittest.TestCase): @patch.object(MooncakeConnectorScheduler, "update_state_after_alloc") def test_update_state_after_alloc(self, mock_method): with patch( - 'vllm_ascend.distributed.mooncake_connector.init_ascend_config' + 'vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_connector.init_ascend_config' ), patch( - 'vllm_ascend.distributed.mooncake_connector.get_ascend_config', + 'vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_connector.get_ascend_config', return_value=MagicMock()): connector = MooncakeConnector(self.config, KVConnectorRole.SCHEDULER) @@ -858,9 +873,9 @@ class TestMooncakeConnector(unittest.TestCase): @patch.object(MooncakeConnectorScheduler, "build_connector_meta") def test_build_connector_meta(self, mock_method): with patch( - 'vllm_ascend.distributed.mooncake_connector.init_ascend_config' + 'vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_connector.init_ascend_config' ), patch( - 'vllm_ascend.distributed.mooncake_connector.get_ascend_config', + 'vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_connector.get_ascend_config', return_value=MagicMock()): connector = MooncakeConnector(self.config, KVConnectorRole.SCHEDULER) @@ -871,9 +886,9 @@ class TestMooncakeConnector(unittest.TestCase): @patch.object(MooncakeConnectorScheduler, "request_finished") def test_request_finished(self, mock_method): with patch( - 'vllm_ascend.distributed.mooncake_connector.init_ascend_config' + 'vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_connector.init_ascend_config' ), patch( - 'vllm_ascend.distributed.mooncake_connector.get_ascend_config', + 'vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_connector.get_ascend_config', return_value=MagicMock()): connector = MooncakeConnector(self.config, KVConnectorRole.SCHEDULER) @@ -887,9 +902,9 @@ class TestMooncakeConnectorScheduler(unittest.TestCase): def setUp(self): self.config = MockVllmConfig() with patch( - 'vllm_ascend.distributed.mooncake_connector.init_ascend_config' + 'vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_connector.init_ascend_config' ), patch( - 'vllm_ascend.distributed.mooncake_connector.get_ascend_config', + 'vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_connector.get_ascend_config', return_value=MagicMock()): self.scheduler = MooncakeConnectorScheduler( self.config, "test_engine") @@ -965,20 +980,24 @@ class TestUtils(unittest.TestCase): with zmq_ctx("INVALID", "tcp://127.0.0.1:5555"): pass - @patch("vllm_ascend.distributed.mooncake_connector.make_zmq_socket") + @patch( + "vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_connector.make_zmq_socket" + ) def test_zmq_ctx_ok(self, mock_make_socket): mock_socket = MagicMock() mock_make_socket.return_value = mock_socket with zmq_ctx(zmq.REQ, "tcp://localhost:1234") as s: # type: ignore self.assertEqual(s, mock_socket) - @patch("vllm_ascend.distributed.mooncake_connector.logger") + @patch( + "vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_connector.logger") def test_ensure_zmq_send_success(self, mock_logger): mock_socket = MagicMock() ensure_zmq_send(mock_socket, b"hello") mock_socket.send.assert_called_once_with(b"hello") - @patch("vllm_ascend.distributed.mooncake_connector.logger") + @patch( + "vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_connector.logger") def test_ensure_zmq_send_retry_and_fail(self, mock_logger): mock_socket = MagicMock() mock_socket.send.side_effect = zmq.ZMQError( # type: ignore @@ -987,7 +1006,8 @@ class TestUtils(unittest.TestCase): ensure_zmq_send(mock_socket, b"hello", max_retries=2) self.assertEqual(mock_socket.send.call_count, 2) - @patch("vllm_ascend.distributed.mooncake_connector.logger") + @patch( + "vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_connector.logger") def test_ensure_zmq_recv_success(self, mock_logger): mock_socket = MagicMock() mock_socket.recv.return_value = b"response" @@ -998,7 +1018,8 @@ class TestUtils(unittest.TestCase): data = ensure_zmq_recv(mock_socket, mock_poller) self.assertEqual(data, b"response") - @patch("vllm_ascend.distributed.mooncake_connector.logger") + @patch( + "vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_connector.logger") def test_ensure_zmq_recv_timeout_and_fail(self, mock_logger): mock_socket = MagicMock() mock_poller = MagicMock() @@ -1106,35 +1127,40 @@ class TestMooncakeConnectorWorker(unittest.TestCase): patch('torch.Tensor.data_ptr', return_value=0x1000), patch('math.prod', return_value=128), patch( - 'vllm_ascend.distributed.mooncake_connector.get_tensor_model_parallel_rank', + 'vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_connector.get_tensor_model_parallel_rank', mock_get_tensor_model_parallel_rank), - patch('vllm_ascend.distributed.mooncake_connector.get_tp_group', - mock_get_tp_group), - patch('vllm_ascend.distributed.mooncake_connector.get_pp_group', - return_value=_mock_pp_group), - patch('vllm_ascend.distributed.mooncake_connector.get_ip', - mock_get_ip), patch( - 'vllm_ascend.distributed.mooncake_connector.string_to_int64_hash', + 'vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_connector.get_tp_group', + mock_get_tp_group), + patch( + 'vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_connector.get_pp_group', + return_value=_mock_pp_group), + patch( + 'vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_connector.get_ip', + mock_get_ip), + patch( + 'vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_connector.string_to_int64_hash', mock_string_to_int64_hash), patch( - 'vllm_ascend.distributed.mooncake_connector.global_te.get_transfer_engine', + 'vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_connector.global_te.get_transfer_engine', return_value=self.mock_transfer_engine), patch( - 'vllm_ascend.distributed.mooncake_connector.global_te.register_buffer', + 'vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_connector.global_te.register_buffer', return_value=None), patch( - 'vllm_ascend.distributed.mooncake_connector.KVCacheSendingThread', + 'vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_connector.KVCacheSendingThread', MagicMock()), patch( - 'vllm_ascend.distributed.mooncake_connector.KVCacheRecvingThread', + 'vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_connector.KVCacheRecvingThread', MagicMock()), - patch('vllm_ascend.distributed.mooncake_connector.logger', - MagicMock()), - patch('vllm_ascend.distributed.mooncake_connector.threading.Event', - MagicMock()), patch( - 'vllm_ascend.distributed.mooncake_connector.get_ascend_config', + 'vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_connector.logger', + MagicMock()), + patch( + 'vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_connector.threading.Event', + MagicMock()), + patch( + 'vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_connector.get_ascend_config', return_value=MagicMock()), ] @@ -1186,7 +1212,8 @@ class TestMooncakeConnectorWorker(unittest.TestCase): def get_tp_rank(prefill_tp_size: int, prefill_pp_size: int, decode_tp_size: int, num_kv_heads: int, tp_num_need_pulls: int, is_deepseek_mla: bool): - with patch('vllm_ascend.distributed.mooncake_connector.get_ascend_config', + with patch( + 'vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_connector.get_ascend_config', return_value=MagicMock()), \ patch.object(self.vllm_config.kv_transfer_config, 'get_from_extra_config', side_effect=lambda k, d=None: { diff --git a/tests/ut/kv_connector/test_mooncake_layerwise_connector.py b/tests/ut/kv_connector/test_mooncake_layerwise_connector.py index 114a224f..7ea4b142 100644 --- a/tests/ut/kv_connector/test_mooncake_layerwise_connector.py +++ b/tests/ut/kv_connector/test_mooncake_layerwise_connector.py @@ -15,7 +15,7 @@ fake_engine = types.ModuleType("mooncake.engine") fake_engine.TransferEngine = MagicMock() # type: ignore[attr-defined] sys.modules["mooncake.engine"] = fake_engine -from vllm_ascend.distributed.mooncake_layerwise_connector import ( # noqa: E402 +from vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_layerwise_connector import ( # noqa: E402 KVCacheRecvingLayerThread, KVCacheSendingLayerThread, KVConnectorRole, MooncakeAgentMetadata, MooncakeLayerwiseConnector, MooncakeLayerwiseConnectorMetadata, MooncakeLayerwiseConnectorScheduler, @@ -81,19 +81,20 @@ class TestKVCacheSendingLayerThread(unittest.TestCase): chunk_finish=False) @patch( - "vllm_ascend.distributed.mooncake_layerwise_connector.npu_stream_switch", + "vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_layerwise_connector.npu_stream_switch", side_effect=lambda *_args, **_kwargs: contextlib.nullcontext()) @patch( - "vllm_ascend.distributed.mooncake_layerwise_connector.torch.Tensor.data_ptr", + "vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_layerwise_connector.torch.Tensor.data_ptr", autospec=True, return_value=0x200000) - @patch("vllm_ascend.distributed.mooncake_layerwise_connector.align_memory", - side_effect=lambda x, _align: x) @patch( - "vllm_ascend.distributed.mooncake_layerwise_connector.torch.npu.synchronize" + "vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_layerwise_connector.align_memory", + side_effect=lambda x, _align: x) + @patch( + "vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_layerwise_connector.torch.npu.synchronize" ) @patch( - "vllm_ascend.distributed.mooncake_layerwise_connector.group_concurrent_contiguous" + "vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_layerwise_connector.group_concurrent_contiguous" ) def test_transfer_pd_gt1_uses_buffers_and_calls_engine( self, mock_group, _mock_sync, _mock_align, _mock_dataptr, @@ -171,10 +172,10 @@ class TestKVCacheSendingLayerThread(unittest.TestCase): self.engine.batch_transfer_sync_write.assert_not_called() @patch( - "vllm_ascend.distributed.mooncake_layerwise_connector.group_concurrent_contiguous", + "vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_layerwise_connector.group_concurrent_contiguous", side_effect=group_concurrent_contiguous) @patch( - "vllm_ascend.distributed.mooncake_layerwise_connector.torch.npu.synchronize" + "vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_layerwise_connector.torch.npu.synchronize" ) def test_callback_invoked_on_final_layer(self, _mock_sync, _mock_group): @@ -250,21 +251,27 @@ class TestKVCacheRecvingLayerThread(unittest.TestCase): self.assertNotIn("reqX", th.task_tracker) self.assertIn("reqX", th.done_requests) - @patch("vllm_ascend.distributed.mooncake_layerwise_connector.logger") - @patch("vllm_ascend.distributed.mooncake_layerwise_connector.get_ip", - return_value="127.0.0.1") @patch( - "vllm_ascend.distributed.mooncake_layerwise_connector.make_zmq_socket") + "vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_layerwise_connector.logger" + ) @patch( - "vllm_ascend.distributed.mooncake_layerwise_connector.make_zmq_path", + "vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_layerwise_connector.get_ip", + return_value="127.0.0.1") + @patch( + "vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_layerwise_connector.make_zmq_socket" + ) + @patch( + "vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_layerwise_connector.make_zmq_path", side_effect=lambda proto, host, port: f"{proto}://{host}:{port}") @patch( - "vllm_ascend.distributed.mooncake_layerwise_connector.msgspec.msgpack.Decoder" + "vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_layerwise_connector.msgspec.msgpack.Decoder" ) @patch( - "vllm_ascend.distributed.mooncake_layerwise_connector.msgspec.msgpack.Encoder" + "vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_layerwise_connector.msgspec.msgpack.Encoder" + ) + @patch( + "vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_layerwise_connector.zmq_ctx" ) - @patch("vllm_ascend.distributed.mooncake_layerwise_connector.zmq_ctx") def test_run_loop_handles_meta_done_invalid_unexpected_and_ack( self, mock_zmq_ctx, mock_Encoder, mock_Decoder, _mock_make_path, _mock_make_sock, _mock_get_ip, mock_logger): @@ -330,16 +337,21 @@ class TestKVCacheRecvingLayerThread(unittest.TestCase): finished = th.get_and_clear_finished_requests() self.assertIn("reqA", finished) - @patch("vllm_ascend.distributed.mooncake_layerwise_connector.logger") - @patch("vllm_ascend.distributed.mooncake_layerwise_connector.get_ip", - return_value="127.0.0.1") @patch( - "vllm_ascend.distributed.mooncake_layerwise_connector.msgspec.msgpack.Decoder" + "vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_layerwise_connector.logger" ) @patch( - "vllm_ascend.distributed.mooncake_layerwise_connector.msgspec.msgpack.Encoder" + "vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_layerwise_connector.get_ip", + return_value="127.0.0.1") + @patch( + "vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_layerwise_connector.msgspec.msgpack.Decoder" + ) + @patch( + "vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_layerwise_connector.msgspec.msgpack.Encoder" + ) + @patch( + "vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_layerwise_connector.zmq_ctx" ) - @patch("vllm_ascend.distributed.mooncake_layerwise_connector.zmq_ctx") def test_run_loop_pd_head_ratio_gt1_requires_multiple_done( self, mock_zmq_ctx, mock_Encoder, mock_Decoder, _mock_get_ip, _mock_logger): @@ -623,7 +635,7 @@ class TestMooncakeLayerwiseConnectorScheduler_More(unittest.TestCase): self.assertEqual(len(meta.requests), 0) @patch( - "vllm_ascend.distributed.mooncake_layerwise_connector.group_concurrent_contiguous" + "vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_layerwise_connector.group_concurrent_contiguous" ) def test_build_connector_meta_emits_when_tokens_reach_total( self, mock_group_concurrent_contiguous): @@ -707,20 +719,25 @@ class TestHelperFunctions(unittest.TestCase): pass @patch( - "vllm_ascend.distributed.mooncake_layerwise_connector.make_zmq_socket") + "vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_layerwise_connector.make_zmq_socket" + ) def test_zmq_ctx_ok(self, mock_make_socket): mock_socket = MagicMock() mock_make_socket.return_value = mock_socket with zmq_ctx(zmq.REQ, "tcp://localhost:1234") as s: # type: ignore self.assertEqual(s, mock_socket) - @patch("vllm_ascend.distributed.mooncake_layerwise_connector.logger") + @patch( + "vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_layerwise_connector.logger" + ) def test_ensure_zmq_send_success(self, _): mock_socket = MagicMock() ensure_zmq_send(mock_socket, b"hello") mock_socket.send.assert_called_once_with(b"hello") - @patch("vllm_ascend.distributed.mooncake_layerwise_connector.logger") + @patch( + "vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_layerwise_connector.logger" + ) def test_ensure_zmq_send_retry_and_fail(self, _): mock_socket = MagicMock() mock_socket.send.side_effect = zmq.ZMQError( # type: ignore @@ -729,7 +746,9 @@ class TestHelperFunctions(unittest.TestCase): ensure_zmq_send(mock_socket, b"hello", max_retries=2) self.assertEqual(mock_socket.send.call_count, 2) - @patch("vllm_ascend.distributed.mooncake_layerwise_connector.logger") + @patch( + "vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_layerwise_connector.logger" + ) def test_ensure_zmq_recv_success(self, _): mock_socket = MagicMock() mock_socket.recv.return_value = b"response" @@ -740,7 +759,9 @@ class TestHelperFunctions(unittest.TestCase): data = ensure_zmq_recv(mock_socket, mock_poller) self.assertEqual(data, b"response") - @patch("vllm_ascend.distributed.mooncake_layerwise_connector.logger") + @patch( + "vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_layerwise_connector.logger" + ) def test_ensure_zmq_recv_timeout_and_fail(self, _): mock_socket = MagicMock() mock_poller = MagicMock() @@ -849,37 +870,37 @@ class TestMooncakeLayerwiseConnectorWorker(unittest.TestCase): patch('math.prod', return_value=128), patch('random.Random'), patch( - 'vllm_ascend.distributed.mooncake_layerwise_connector.get_tensor_model_parallel_rank', + 'vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_layerwise_connector.get_tensor_model_parallel_rank', return_value=0), patch( - 'vllm_ascend.distributed.mooncake_layerwise_connector.get_tp_group', + 'vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_layerwise_connector.get_tp_group', return_value=None), patch( - 'vllm_ascend.distributed.mooncake_layerwise_connector.get_ip', + 'vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_layerwise_connector.get_ip', return_value="127.0.0.1"), patch( - 'vllm_ascend.distributed.mooncake_layerwise_connector.string_to_int64_hash', + 'vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_layerwise_connector.string_to_int64_hash', side_effect=lambda s: hash(s)), patch( - 'vllm_ascend.distributed.mooncake_layerwise_connector.global_te.get_transfer_engine', + 'vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_layerwise_connector.global_te.get_transfer_engine', return_value=self.mock_transfer_engine), patch( - 'vllm_ascend.distributed.mooncake_layerwise_connector.global_te.register_buffer', + 'vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_layerwise_connector.global_te.register_buffer', return_value=None), patch( - 'vllm_ascend.distributed.mooncake_layerwise_connector.KVCacheSendingLayerThread', + 'vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_layerwise_connector.KVCacheSendingLayerThread', MagicMock()), patch( - 'vllm_ascend.distributed.mooncake_layerwise_connector.KVCacheRecvingLayerThread', + 'vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_layerwise_connector.KVCacheRecvingLayerThread', MagicMock()), patch( - 'vllm_ascend.distributed.mooncake_layerwise_connector.logger', + 'vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_layerwise_connector.logger', MagicMock()), patch( - 'vllm_ascend.distributed.mooncake_layerwise_connector.threading.Event', + 'vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_layerwise_connector.threading.Event', MagicMock()), patch( - 'vllm_ascend.distributed.mooncake_layerwise_connector.get_ascend_config', + 'vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_layerwise_connector.get_ascend_config', return_value=SimpleNamespace(pd_tp_ratio=1, num_head_replica=1, pd_head_ratio=1)), diff --git a/tests/ut/kv_connector/utils.py b/tests/ut/kv_connector/utils.py index 0723eb01..582830d8 100644 --- a/tests/ut/kv_connector/utils.py +++ b/tests/ut/kv_connector/utils.py @@ -79,8 +79,7 @@ def create_vllm_config( ) kv_transfer_config = KVTransferConfig( kv_connector="MooncakeConnectorV1", - kv_role="kv_both", - kv_connector_module_path="vllm_ascend.distributed.mooncake_connector") + kv_role="kv_both") return VllmConfig(scheduler_config=scheduler_config, model_config=model_config, cache_config=cache_config, diff --git a/tests/ut/test_platform.py b/tests/ut/test_platform.py index 6f2eeec1..2646409c 100644 --- a/tests/ut/test_platform.py +++ b/tests/ut/test_platform.py @@ -461,7 +461,7 @@ class TestNPUPlatform(TestBase): def test_get_device_communicator_cls_returns_correct_value(self): self.assertEqual( self.platform.get_device_communicator_cls(), - "vllm_ascend.distributed.communicator.NPUCommunicator", + "vllm_ascend.distributed.device_communicators.npu_communicator.NPUCommunicator", ) def test_is_pin_memory_available_returns_true(self): diff --git a/vllm_ascend/__init__.py b/vllm_ascend/__init__.py index 117859df..997f97cf 100644 --- a/vllm_ascend/__init__.py +++ b/vllm_ascend/__init__.py @@ -23,7 +23,7 @@ def register(): def register_connector(): - from vllm_ascend.distributed import register_connector + from vllm_ascend.distributed.kv_transfer import register_connector register_connector() diff --git a/vllm_ascend/distributed/__init__.py b/vllm_ascend/distributed/__init__.py index 91743a8f..e69de29b 100644 --- a/vllm_ascend/distributed/__init__.py +++ b/vllm_ascend/distributed/__init__.py @@ -1,44 +0,0 @@ -# -# Copyright (c) 2025 Huawei Technologies Co., Ltd. All Rights Reserved. -# This file is a part of the vllm-ascend project. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -from vllm.distributed.kv_transfer.kv_connector.factory import \ - KVConnectorFactory - - -def register_connector(): - KVConnectorFactory.register_connector( - "MooncakeConnectorV1", "vllm_ascend.distributed.mooncake_connector", - "MooncakeConnector") - - KVConnectorFactory.register_connector( - "MooncakeConnectorStoreV1", - "vllm_ascend.distributed.kvpool.ascend_store_connector", - "AscendStoreConnector") - - KVConnectorFactory.register_connector( - "AscendStoreConnector", - "vllm_ascend.distributed.kvpool.ascend_store_connector", - "AscendStoreConnector") - - KVConnectorFactory.register_connector( - "MooncakeLayerwiseConnector", - "vllm_ascend.distributed.mooncake_layerwise_connector", - "MooncakeLayerwiseConnector") - - KVConnectorFactory.register_connector( - "UCMConnector", "vllm_ascend.distributed.ucm_connector", - "UCMConnectorV1") diff --git a/vllm_ascend/distributed/communicator.py b/vllm_ascend/distributed/device_communicators/npu_communicator.py similarity index 100% rename from vllm_ascend/distributed/communicator.py rename to vllm_ascend/distributed/device_communicators/npu_communicator.py diff --git a/vllm_ascend/distributed/kv_transfer/__init__.py b/vllm_ascend/distributed/kv_transfer/__init__.py new file mode 100644 index 00000000..7e3a0618 --- /dev/null +++ b/vllm_ascend/distributed/kv_transfer/__init__.py @@ -0,0 +1,45 @@ +# +# Copyright (c) 2025 Huawei Technologies Co., Ltd. All Rights Reserved. +# This file is a part of the vllm-ascend project. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from vllm.distributed.kv_transfer.kv_connector.factory import \ + KVConnectorFactory + + +def register_connector(): + KVConnectorFactory.register_connector( + "MooncakeConnectorV1", + "vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_connector", + "MooncakeConnector") + + KVConnectorFactory.register_connector( + "MooncakeConnectorStoreV1", + "vllm_ascend.distributed.kv_transfer.kv_pool.ascend_store.ascend_store_connector", + "AscendStoreConnector") + + KVConnectorFactory.register_connector( + "AscendStoreConnector", + "vllm_ascend.distributed.kv_transfer.kv_pool.ascend_store.ascend_store_connector", + "AscendStoreConnector") + + KVConnectorFactory.register_connector( + "MooncakeLayerwiseConnector", + "vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_layerwise_connector", + "MooncakeLayerwiseConnector") + + KVConnectorFactory.register_connector( + "UCMConnector", "vllm_ascend.distributed.kv_transfer.ucm_connector", + "UCMConnectorV1") diff --git a/vllm_ascend/distributed/cpu_offload_manager/__init__.py b/vllm_ascend/distributed/kv_transfer/kv_p2p/__init__.py similarity index 100% rename from vllm_ascend/distributed/cpu_offload_manager/__init__.py rename to vllm_ascend/distributed/kv_transfer/kv_p2p/__init__.py diff --git a/vllm_ascend/distributed/mooncake_connector.py b/vllm_ascend/distributed/kv_transfer/kv_p2p/mooncake_connector.py similarity index 99% rename from vllm_ascend/distributed/mooncake_connector.py rename to vllm_ascend/distributed/kv_transfer/kv_p2p/mooncake_connector.py index 9c33f25d..cb9eeea5 100644 --- a/vllm_ascend/distributed/mooncake_connector.py +++ b/vllm_ascend/distributed/kv_transfer/kv_p2p/mooncake_connector.py @@ -41,8 +41,8 @@ from vllm.v1.kv_cache_interface import KVCacheConfig from vllm.v1.request import RequestStatus from vllm_ascend.ascend_config import get_ascend_config, init_ascend_config -from vllm_ascend.distributed.mooncake_transfer_engine import global_te -from vllm_ascend.distributed.utils import get_transfer_timeout_value +from vllm_ascend.distributed.kv_transfer.utils.mooncake_transfer_engine import global_te +from vllm_ascend.distributed.kv_transfer.utils.utils import get_transfer_timeout_value from vllm_ascend.utils import is_vl_model if TYPE_CHECKING: diff --git a/vllm_ascend/distributed/mooncake_layerwise_connector.py b/vllm_ascend/distributed/kv_transfer/kv_p2p/mooncake_layerwise_connector.py similarity index 99% rename from vllm_ascend/distributed/mooncake_layerwise_connector.py rename to vllm_ascend/distributed/kv_transfer/kv_p2p/mooncake_layerwise_connector.py index f9e15d5c..ee3b5f3a 100644 --- a/vllm_ascend/distributed/mooncake_layerwise_connector.py +++ b/vllm_ascend/distributed/kv_transfer/kv_p2p/mooncake_layerwise_connector.py @@ -32,11 +32,12 @@ from vllm.v1.core.sched.output import SchedulerOutput from vllm.v1.kv_cache_interface import KVCacheConfig from vllm_ascend.ascend_config import get_ascend_config -from vllm_ascend.distributed.mooncake_connector import GET_META_MSG -from vllm_ascend.distributed.mooncake_transfer_engine import global_te -from vllm_ascend.distributed.utils import (align_memory, - get_transfer_timeout_value, - kv_alltoall_and_rearrange) +from vllm_ascend.distributed.kv_transfer.kv_p2p.mooncake_connector import \ + GET_META_MSG +from vllm_ascend.distributed.kv_transfer.utils.mooncake_transfer_engine import \ + global_te +from vllm_ascend.distributed.kv_transfer.utils.utils import ( + align_memory, get_transfer_timeout_value, kv_alltoall_and_rearrange) from vllm_ascend.utils import npu_stream_switch if TYPE_CHECKING: diff --git a/vllm_ascend/distributed/kv_transfer/kv_pool/__init__.py b/vllm_ascend/distributed/kv_transfer/kv_pool/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/vllm_ascend/distributed/kv_transfer/kv_pool/ascend_store/__init__.py b/vllm_ascend/distributed/kv_transfer/kv_pool/ascend_store/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/vllm_ascend/distributed/kvpool/ascend_store_connector.py b/vllm_ascend/distributed/kv_transfer/kv_pool/ascend_store/ascend_store_connector.py similarity index 97% rename from vllm_ascend/distributed/kvpool/ascend_store_connector.py rename to vllm_ascend/distributed/kv_transfer/kv_pool/ascend_store/ascend_store_connector.py index f1137612..b65a5743 100644 --- a/vllm_ascend/distributed/kvpool/ascend_store_connector.py +++ b/vllm_ascend/distributed/kv_transfer/kv_pool/ascend_store/ascend_store_connector.py @@ -16,9 +16,10 @@ from vllm.v1.kv_cache_interface import KVCacheConfig from vllm.v1.request import Request from vllm.v1.serial_utils import MsgpackDecoder -from vllm_ascend.distributed.kvpool.pool_scheduler import ( +from vllm_ascend.distributed.kv_transfer.kv_pool.ascend_store.pool_scheduler import ( KVPoolScheduler, get_zmq_rpc_path_lookup) -from vllm_ascend.distributed.kvpool.pool_worker import KVPoolWorker +from vllm_ascend.distributed.kv_transfer.kv_pool.ascend_store.pool_worker import \ + KVPoolWorker class AscendStoreConnector(KVConnectorBase_V1): diff --git a/vllm_ascend/distributed/kvpool/__init__.py b/vllm_ascend/distributed/kv_transfer/kv_pool/ascend_store/backend/__init__.py similarity index 100% rename from vllm_ascend/distributed/kvpool/__init__.py rename to vllm_ascend/distributed/kv_transfer/kv_pool/ascend_store/backend/__init__.py diff --git a/vllm_ascend/distributed/kvpool/backend/backend.py b/vllm_ascend/distributed/kv_transfer/kv_pool/ascend_store/backend/backend.py similarity index 100% rename from vllm_ascend/distributed/kvpool/backend/backend.py rename to vllm_ascend/distributed/kv_transfer/kv_pool/ascend_store/backend/backend.py diff --git a/vllm_ascend/distributed/kvpool/backend/memcache_backend.py b/vllm_ascend/distributed/kv_transfer/kv_pool/ascend_store/backend/memcache_backend.py similarity index 97% rename from vllm_ascend/distributed/kvpool/backend/memcache_backend.py rename to vllm_ascend/distributed/kv_transfer/kv_pool/ascend_store/backend/memcache_backend.py index fcf9120f..4769663b 100644 --- a/vllm_ascend/distributed/kvpool/backend/memcache_backend.py +++ b/vllm_ascend/distributed/kv_transfer/kv_pool/ascend_store/backend/memcache_backend.py @@ -5,7 +5,8 @@ import torch from vllm.config import ParallelConfig from vllm.logger import logger -from vllm_ascend.distributed.kvpool.backend.backend import Backend +from vllm_ascend.distributed.kv_transfer.kv_pool.ascend_store.backend.backend import \ + Backend from vllm_ascend.utils import AscendDeviceType, get_ascend_device_type diff --git a/vllm_ascend/distributed/kvpool/backend/mooncake_backend.py b/vllm_ascend/distributed/kv_transfer/kv_pool/ascend_store/backend/mooncake_backend.py similarity index 97% rename from vllm_ascend/distributed/kvpool/backend/mooncake_backend.py rename to vllm_ascend/distributed/kv_transfer/kv_pool/ascend_store/backend/mooncake_backend.py index 53426971..25a103ca 100644 --- a/vllm_ascend/distributed/kvpool/backend/mooncake_backend.py +++ b/vllm_ascend/distributed/kv_transfer/kv_pool/ascend_store/backend/mooncake_backend.py @@ -11,8 +11,10 @@ from vllm.config import ParallelConfig from vllm.logger import logger from vllm.utils.network_utils import get_ip -from vllm_ascend.distributed.kvpool.backend.backend import Backend -from vllm_ascend.distributed.mooncake_transfer_engine import global_te +from vllm_ascend.distributed.kv_transfer.kv_pool.ascend_store.backend.backend import \ + Backend +from vllm_ascend.distributed.kv_transfer.utils.mooncake_transfer_engine import \ + global_te DEFAULT_GLOBAL_SEGMENT_SIZE = 3355443200 # 3.125 GiB DEFAULT_LOCAL_BUFFER_SIZE = 1073741824 # 1.0 GiB diff --git a/vllm_ascend/distributed/kvpool/config_data.py b/vllm_ascend/distributed/kv_transfer/kv_pool/ascend_store/config_data.py similarity index 100% rename from vllm_ascend/distributed/kvpool/config_data.py rename to vllm_ascend/distributed/kv_transfer/kv_pool/ascend_store/config_data.py diff --git a/vllm_ascend/distributed/kvpool/kv_transfer.py b/vllm_ascend/distributed/kv_transfer/kv_pool/ascend_store/kv_transfer.py similarity index 98% rename from vllm_ascend/distributed/kvpool/kv_transfer.py rename to vllm_ascend/distributed/kv_transfer/kv_pool/ascend_store/kv_transfer.py index 4b2061d8..0eeedbf4 100644 --- a/vllm_ascend/distributed/kvpool/kv_transfer.py +++ b/vllm_ascend/distributed/kv_transfer/kv_pool/ascend_store/kv_transfer.py @@ -7,10 +7,11 @@ from typing import Any import torch from vllm.logger import logger -from vllm_ascend.distributed.kvpool.backend.backend import Backend +from vllm_ascend.distributed.kv_transfer.kv_pool.ascend_store.backend.backend import \ + Backend # isort: off -from vllm_ascend.distributed.kvpool.config_data import ( +from vllm_ascend.distributed.kv_transfer.kv_pool.ascend_store.config_data import ( ChunkedTokenDatabase, LasyerMultiBlockReqMeta, ReqMeta, diff --git a/vllm_ascend/distributed/cpu_offload_manager/metadata.py b/vllm_ascend/distributed/kv_transfer/kv_pool/ascend_store/metadata.py similarity index 99% rename from vllm_ascend/distributed/cpu_offload_manager/metadata.py rename to vllm_ascend/distributed/kv_transfer/kv_pool/ascend_store/metadata.py index 71242a86..31ff509f 100644 --- a/vllm_ascend/distributed/cpu_offload_manager/metadata.py +++ b/vllm_ascend/distributed/kv_transfer/kv_pool/ascend_store/metadata.py @@ -14,7 +14,7 @@ from vllm.utils.network_utils import make_zmq_socket from vllm.utils.torch_utils import get_dtype_size from vllm.v1.kv_cache_interface import AttentionSpec, MLAAttentionSpec -from vllm_ascend.distributed.cpu_offload_manager.cpu_kv_cache_manager import \ +from vllm_ascend.distributed.kv_transfer.kv_pool.cpu_offload.cpu_kv_cache_manager import \ CPUKVCacheManager diff --git a/vllm_ascend/distributed/kvpool/pool_scheduler.py b/vllm_ascend/distributed/kv_transfer/kv_pool/ascend_store/pool_scheduler.py similarity index 99% rename from vllm_ascend/distributed/kvpool/pool_scheduler.py rename to vllm_ascend/distributed/kv_transfer/kv_pool/ascend_store/pool_scheduler.py index 39cdbda0..3ca2ca49 100644 --- a/vllm_ascend/distributed/kvpool/pool_scheduler.py +++ b/vllm_ascend/distributed/kv_transfer/kv_pool/ascend_store/pool_scheduler.py @@ -13,7 +13,7 @@ from vllm.v1.core.sched.output import SchedulerOutput from vllm.v1.request import Request from vllm.v1.serial_utils import MsgpackEncoder -from vllm_ascend.distributed.kvpool.config_data import ( +from vllm_ascend.distributed.kv_transfer.kv_pool.ascend_store.config_data import ( AscendConnectorMetadata, LoadSpec, ReqMeta, RequestTracker) diff --git a/vllm_ascend/distributed/kvpool/pool_worker.py b/vllm_ascend/distributed/kv_transfer/kv_pool/ascend_store/pool_worker.py similarity index 98% rename from vllm_ascend/distributed/kvpool/pool_worker.py rename to vllm_ascend/distributed/kv_transfer/kv_pool/ascend_store/pool_worker.py index 92543aec..19f1c5b8 100644 --- a/vllm_ascend/distributed/kvpool/pool_worker.py +++ b/vllm_ascend/distributed/kv_transfer/kv_pool/ascend_store/pool_worker.py @@ -11,15 +11,16 @@ from vllm.distributed import (get_decode_context_model_parallel_rank, from vllm.logger import logger from vllm.v1.core.kv_cache_utils import BlockHash -from vllm_ascend.distributed.kvpool.backend.backend import Backend -from vllm_ascend.distributed.kvpool.backend.memcache_backend import \ +from vllm_ascend.distributed.kv_transfer.kv_pool.ascend_store.backend.backend import \ + Backend +from vllm_ascend.distributed.kv_transfer.kv_pool.ascend_store.backend.memcache_backend import \ MemcacheBackend -from vllm_ascend.distributed.kvpool.backend.mooncake_backend import \ +from vllm_ascend.distributed.kv_transfer.kv_pool.ascend_store.backend.mooncake_backend import \ MooncakeBackend -from vllm_ascend.distributed.kvpool.config_data import ( +from vllm_ascend.distributed.kv_transfer.kv_pool.ascend_store.config_data import ( AscendConnectorMetadata, ChunkedTokenDatabase, KeyMetadata, LasyerMultiBlockReqMeta, ReqMeta) -from vllm_ascend.distributed.kvpool.kv_transfer import ( +from vllm_ascend.distributed.kv_transfer.kv_pool.ascend_store.kv_transfer import ( KVCacheStoreLayerRecvingThread, KVCacheStoreLayerSendingThread, KVCacheStoreRecvingThread, KVCacheStoreSendingThread, KVTransferThread) diff --git a/vllm_ascend/distributed/kv_transfer/kv_pool/cpu_offload/__init__.py b/vllm_ascend/distributed/kv_transfer/kv_pool/cpu_offload/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/vllm_ascend/distributed/cpu_offload_manager/cpu_kv_cache_manager.py b/vllm_ascend/distributed/kv_transfer/kv_pool/cpu_offload/cpu_kv_cache_manager.py similarity index 100% rename from vllm_ascend/distributed/cpu_offload_manager/cpu_kv_cache_manager.py rename to vllm_ascend/distributed/kv_transfer/kv_pool/cpu_offload/cpu_kv_cache_manager.py diff --git a/vllm_ascend/distributed/cpu_offload_connector.py b/vllm_ascend/distributed/kv_transfer/kv_pool/cpu_offload/cpu_offload_connector.py similarity index 99% rename from vllm_ascend/distributed/cpu_offload_connector.py rename to vllm_ascend/distributed/kv_transfer/kv_pool/cpu_offload/cpu_offload_connector.py index 9bcde279..49f9c77a 100644 --- a/vllm_ascend/distributed/cpu_offload_connector.py +++ b/vllm_ascend/distributed/kv_transfer/kv_pool/cpu_offload/cpu_offload_connector.py @@ -24,7 +24,7 @@ from vllm.v1.core.sched.output import SchedulerOutput from vllm.v1.kv_cache_interface import (FullAttentionSpec, KVCacheSpec, MambaSpec, MLAAttentionSpec) -from vllm_ascend.distributed.cpu_offload_manager.metadata import ( +from vllm_ascend.distributed.kv_transfer.kv_pool.ascend_store.metadata import ( MetadataServer, MetadataServerProc, MLAConfig) if TYPE_CHECKING: diff --git a/vllm_ascend/distributed/ucm_connector.py b/vllm_ascend/distributed/kv_transfer/ucm_connector.py similarity index 100% rename from vllm_ascend/distributed/ucm_connector.py rename to vllm_ascend/distributed/kv_transfer/ucm_connector.py diff --git a/vllm_ascend/distributed/kv_transfer/utils/__init__.py b/vllm_ascend/distributed/kv_transfer/utils/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/vllm_ascend/distributed/mooncake_transfer_engine.py b/vllm_ascend/distributed/kv_transfer/utils/mooncake_transfer_engine.py similarity index 100% rename from vllm_ascend/distributed/mooncake_transfer_engine.py rename to vllm_ascend/distributed/kv_transfer/utils/mooncake_transfer_engine.py diff --git a/vllm_ascend/distributed/kv_transfer/utils/utils.py b/vllm_ascend/distributed/kv_transfer/utils/utils.py new file mode 100644 index 00000000..c25c1f15 --- /dev/null +++ b/vllm_ascend/distributed/kv_transfer/utils/utils.py @@ -0,0 +1,61 @@ +import os + +import torch +import torch.distributed as dist + +from vllm_ascend.distributed.parallel_state import get_p_tp_group + + +def kv_alltoall_and_rearrange(pd_tp_ratio: int, key: torch.Tensor, + value: torch.TensorType): + if pd_tp_ratio <= 1: + return None, None + elif key is None or value is None: + raise ValueError("key or value is None") + k_output = alltoall_and_rearrange(pd_tp_ratio, key) + v_output = alltoall_and_rearrange(pd_tp_ratio, value) + return k_output, v_output + + +def alltoall_and_rearrange(tp_ratio: int, input_tensor: torch.Tensor): + num_kv_heads = input_tensor.size(1) + output_tensor = torch.zeros_like(input_tensor) + dist.all_to_all_single(output_tensor, + input_tensor, + group=get_p_tp_group().device_group) + input_tensor = 0 + result = rearrange_output(output_tensor, tp_ratio, num_kv_heads) + output_tensor = 0 + return result + + +def rearrange_output(base_output: torch.Tensor, cut_num: int, + num_kv_heads: int): + size_0 = base_output.size(0) + if size_0 % cut_num != 0: + raise ValueError( + f"The size of dim 0 [{size_0}] must be divisible by the cut_num [{cut_num}]" + ) + chunk_size = size_0 // cut_num + reshaped = base_output.view(cut_num, chunk_size, -1) + transposed = reshaped.transpose(0, 1) + return transposed.contiguous().view(size_0, num_kv_heads, -1) + + +def align_memory(tensor: torch.Tensor, alignment: int) -> torch.Tensor: + data_ptr = tensor.data_ptr() + aligned_addr = (data_ptr + alignment - 1) // alignment * alignment + offset = (aligned_addr - data_ptr) // tensor.element_size() + return tensor[int(offset):] + + +def get_transfer_timeout_value(): + ascend_transfer_timeout = os.getenv("ASCEND_TRANSFER_TIMEOUT", "") + if len(ascend_transfer_timeout) > 0: + return int(ascend_transfer_timeout) + hccl_rdma_timeout = int(os.getenv('HCCL_RDMA_TIMEOUT', + '20')) # type: ignore + hccl_rdma_retry_cnt = int(os.getenv('HCCL_RDMA_RETRY_CNT', + '7')) # type: ignore + return int((4.096 * (2**hccl_rdma_timeout)) * hccl_rdma_retry_cnt // 1000 + + 3000) diff --git a/vllm_ascend/distributed/kvpool/backend/__init__.py b/vllm_ascend/distributed/kvpool/backend/__init__.py deleted file mode 100644 index 8b137891..00000000 --- a/vllm_ascend/distributed/kvpool/backend/__init__.py +++ /dev/null @@ -1 +0,0 @@ - diff --git a/vllm_ascend/distributed/utils.py b/vllm_ascend/distributed/utils.py index 3a624de2..4a73e6c5 100644 --- a/vllm_ascend/distributed/utils.py +++ b/vllm_ascend/distributed/utils.py @@ -1,4 +1,3 @@ -import os from typing import Optional import torch @@ -6,63 +5,7 @@ import torch.distributed as dist from vllm.distributed.parallel_state import GroupCoordinator, get_dp_group from vllm.forward_context import get_forward_context -from vllm_ascend.distributed.parallel_state import (get_fc3_quant_x_group, - get_p_tp_group) - - -def kv_alltoall_and_rearrange(pd_tp_ratio: int, key: torch.Tensor, - value: torch.TensorType): - if pd_tp_ratio <= 1: - return None, None - elif key is None or value is None: - raise ValueError("key or value is None") - k_output = alltoall_and_rearrange(pd_tp_ratio, key) - v_output = alltoall_and_rearrange(pd_tp_ratio, value) - return k_output, v_output - - -def alltoall_and_rearrange(tp_ratio: int, input_tensor: torch.Tensor): - num_kv_heads = input_tensor.size(1) - output_tensor = torch.zeros_like(input_tensor) - dist.all_to_all_single(output_tensor, - input_tensor, - group=get_p_tp_group().device_group) - input_tensor = 0 - result = rearrange_output(output_tensor, tp_ratio, num_kv_heads) - output_tensor = 0 - return result - - -def rearrange_output(base_output: torch.Tensor, cut_num: int, - num_kv_heads: int): - size_0 = base_output.size(0) - if size_0 % cut_num != 0: - raise ValueError( - f"The size of dim 0 [{size_0}] must be divisible by the cut_num [{cut_num}]" - ) - chunk_size = size_0 // cut_num - reshaped = base_output.view(cut_num, chunk_size, -1) - transposed = reshaped.transpose(0, 1) - return transposed.contiguous().view(size_0, num_kv_heads, -1) - - -def align_memory(tensor: torch.Tensor, alignment: int) -> torch.Tensor: - data_ptr = tensor.data_ptr() - aligned_addr = (data_ptr + alignment - 1) // alignment * alignment - offset = (aligned_addr - data_ptr) // tensor.element_size() - return tensor[int(offset):] - - -def get_transfer_timeout_value(): - ascend_transfer_timeout = os.getenv("ASCEND_TRANSFER_TIMEOUT", "") - if len(ascend_transfer_timeout) > 0: - return int(ascend_transfer_timeout) - hccl_rdma_timeout = int(os.getenv('HCCL_RDMA_TIMEOUT', - '20')) # type: ignore - hccl_rdma_retry_cnt = int(os.getenv('HCCL_RDMA_RETRY_CNT', - '7')) # type: ignore - return int((4.096 * (2**hccl_rdma_timeout)) * hccl_rdma_retry_cnt // 1000 + - 3000) +from vllm_ascend.distributed.parallel_state import get_fc3_quant_x_group def fc3_all_gather_and_maybe_unpad_impl(x: torch.Tensor, ) -> torch.Tensor: @@ -90,6 +33,7 @@ def fc3_all_gather_and_maybe_unpad_impl(x: torch.Tensor, ) -> torch.Tensor: result[offset:offset + num_tokens_dp] = x[idx, :num_tokens_dp] offset += num_tokens_dp x = result + return x diff --git a/vllm_ascend/patch/worker/patch_distributed.py b/vllm_ascend/patch/worker/patch_distributed.py index 2b462902..ed01a010 100644 --- a/vllm_ascend/patch/worker/patch_distributed.py +++ b/vllm_ascend/patch/worker/patch_distributed.py @@ -23,7 +23,8 @@ from torch.distributed import Backend from vllm.distributed.parallel_state import (GroupCoordinator, _get_unique_name, _register_group) -from vllm_ascend.distributed.communicator import NPUCommunicator +from vllm_ascend.distributed.device_communicators.npu_communicator import \ + NPUCommunicator from vllm_ascend.utils import create_hccl_pg_options diff --git a/vllm_ascend/platform.py b/vllm_ascend/platform.py index f37a1ad8..c93aa78b 100644 --- a/vllm_ascend/platform.py +++ b/vllm_ascend/platform.py @@ -411,7 +411,7 @@ class NPUPlatform(Platform): @classmethod def get_device_communicator_cls(cls) -> str: - return "vllm_ascend.distributed.communicator.NPUCommunicator" + return "vllm_ascend.distributed.device_communicators.npu_communicator.NPUCommunicator" @classmethod def is_pin_memory_available(cls):