diff --git a/docs/source/tutorials/index.md b/docs/source/tutorials/index.md index 37bfb9c..da88906 100644 --- a/docs/source/tutorials/index.md +++ b/docs/source/tutorials/index.md @@ -18,6 +18,7 @@ multi-node_dsv3.2.md multi_node multi_node_kimi multi_node_qwen3vl -multi_node_pd_disaggregation +multi_node_pd_disaggregation_llmdatadist +multi_node_pd_disaggregation_mooncake multi_node_ray ::: diff --git a/docs/source/tutorials/multi_node_pd_disaggregation.md b/docs/source/tutorials/multi_node_pd_disaggregation_llmdatadist.md similarity index 99% rename from docs/source/tutorials/multi_node_pd_disaggregation.md rename to docs/source/tutorials/multi_node_pd_disaggregation_llmdatadist.md index e334973..4814ae1 100644 --- a/docs/source/tutorials/multi_node_pd_disaggregation.md +++ b/docs/source/tutorials/multi_node_pd_disaggregation_llmdatadist.md @@ -1,4 +1,4 @@ -# Prefill-Decode Disaggregation Verification (Qwen) +# Prefill-Decode Disaggregation Llmdatadist Verification (Qwen) ## Getting Start diff --git a/docs/source/tutorials/multi_node_pd_disaggregation_mooncake.md b/docs/source/tutorials/multi_node_pd_disaggregation_mooncake.md new file mode 100644 index 0000000..8f81609 --- /dev/null +++ b/docs/source/tutorials/multi_node_pd_disaggregation_mooncake.md @@ -0,0 +1,596 @@ +# Prefill-Decode Disaggregation Mooncake Verification (Qwen) + +## Getting Start + +vLLM-Ascend now supports prefill-decode (PD) disaggregation with EP (Expert Parallel) options. This guide take one-by-one steps to verify these features with constrained resources. + +Take the Qwen3-235B model as an example, use vllm-ascend v0.11.0rc1 (with vLLM v0.11.0) on 4 Atlas 800T A3 servers to deploy the "2P1D" architecture. Assume the ip of the prefiller server is 192.0.0.1 (prefill 1) and 192.0.0.2 (prefill 2), and the decoder servers are 192.0.0.3 (decoder 1) and 192.0.0.4 (decoder 2). On each server, use 8 NPUs 16 chips to deploy one service instance. + +## Verify Multi-Node Communication Environment + +### Physical Layer Requirements + +- The physical machines must be located on the same WLAN, with network connectivity. +- All NPUs must be interconnected. Intra-node connectivity is via HCCS, and inter-node connectivity is via RDMA. + +### Verification Process + +1. Single Node Verification: + +Execute the following commands on each node in sequence. The results must all be `success` and the status must be `UP`: + +```bash +# Check the remote switch ports +for i in {0..15}; do hccn_tool -i $i -lldp -g | grep Ifname; done +# Get the link status of the Ethernet ports (UP or DOWN) +for i in {0..15}; do hccn_tool -i $i -link -g ; done +# Check the network health status +for i in {0..15}; do hccn_tool -i $i -net_health -g ; done +# View the network detected IP configuration +for i in {0..15}; do hccn_tool -i $i -netdetect -g ; done +# View gateway configuration +for i in {0..15}; do hccn_tool -i $i -gateway -g ; done +# View NPU network configuration +cat /etc/hccn.conf +``` + +2. Get NPU IP Addresses + +```bash +for i in {0..15}; do hccn_tool -i $i -ip -g | grep ipaddr; done +``` + +3. Cross-Node PING Test + +```bash +# Execute on the target node (replace 'x.x.x.x' with actual npu ip address) +for i in {0..15}; do hccn_tool -i $i -ping -g address x.x.x.x;done +``` + +## Install Mooncake + +Mooncake is the serving platform for Kimi, a leading LLM service provided by Moonshot AI. First, we need to obtain the Mooncake project. Refer to the following command: + +```shell +git clone -b pooling_async_memecpy_v1 https://github.com/AscendTransport/Mooncake +``` + +Update and install Python + +```shell +apt-get install python3 +``` + +Install the relevant dependencies. + +```shell +cd Mooncake +bash dependencies.sh +``` + +Install mpi + +```shell +apt purge mpich libmpich-dev +apt purge openmpi-bin +apt purge openmpi-bin libopenmpi-dev +apt install mpich libmpich-dev +export CPATH=/usr/lib/aarch64-linux-gnu/mpich/include/:$CPATH +export CPATH=/usr/lib/aarch64-linux-gnu/openmpi/lib:$CPATH +``` + +Compile and install + +```shell +mkdir build +cd build +cmake .. +make -j +make install +cp mooncake-transfer-engine/src/transport/ascend_transport/hccl_transport/ascend_transport_c/libascend_transport_mem.so /usr/local/Ascend/ascend-toolkit/latest/python/site-packages/ +cp mooncake-transfer-engine/src/libtransfer_engine.so /usr/local/Ascend/ascend-toolkit/latest/python/site-packages/ +export LD_LIBRARY_PATH=/usr/local/Ascend/ascend-toolkit/latest/python/site-packages:$LD_LIBRARY_PATH +``` + +## Prefiller / Decoder Deployment + +We can run the following scripts to launch a server on the prefiller/decoder node respectively. + +### layerwise + +:::::{tab-set} + +::::{tab-item} Prefiller node 1 + +```shell +unset ftp_proxy +unset https_proxy +unset http_proxy +export HCCL_IF_IP=192.0.0.1 +export GLOO_SOCKET_IFNAME="eth0" # network card name +export TP_SOCKET_IFNAME="eth0" +export HCCL_SOCKET_IFNAME="eth0" +export VLLM_USE_V1=1 +export HCCL_BUFFSIZE=1024 +export OMP_PROC_BIND=false +export OMP_NUM_THREADS=10 +export ASCEND_AGGREGATE_ENABLE=1 +export ASCEND_TRANSPORT_PRINT=0 +export ACL_OP_INIT_MODE=1 +export ASCEND_A3_ENABLE=1 +vllm serve /model/Qwen3-235B-A22B-W8A8 \ + --host 0.0.0.0 \ + --port 8004 \ + --api-server-count 2 \ + --data-parallel-size 2 \ + --data-parallel-size-local 2 \ + --data-parallel-address 192.0.0.1 \ + --data-parallel-rpc-port 13389 \ + --tensor-parallel-size 8 \ + --enable-expert-parallel \ + --seed 1024 \ + --enforce-eager \ + --distributed-executor-backend mp \ + --served-model-name qwen3-moe \ + --max-model-len 32768 \ + --max-num-batched-tokens 4096 \ + --trust-remote-code \ + --no-enable-prefix-caching \ + --gpu-memory-utilization 0.9 \ + --kv-transfer-config \ + '{"kv_connector": "MooncakeLayerwiseConnector", + "kv_role": "kv_producer", + "kv_port": "30000", + "engine_id": "0", + "kv_connector_module_path": "vllm_ascend.distributed.mooncake_layerwise_connector", + "kv_connector_extra_config": { + "use_ascend_direct": true, + "prefill": { + "dp_size": 2, + "tp_size": 8 + }, + "decode": { + "dp_size": 32, + "tp_size": 1 + } + } + }' +``` + +:::: + +::::{tab-item} Prefiller node 2 + +```shell +unset ftp_proxy +unset https_proxy +unset http_proxy +export HCCL_IF_IP=192.0.0.2 +export GLOO_SOCKET_IFNAME="eth0" # network card name +export TP_SOCKET_IFNAME="eth0" +export HCCL_SOCKET_IFNAME="eth0" +export VLLM_USE_V1=1 +export HCCL_BUFFSIZE=1024 +export OMP_PROC_BIND=false +export OMP_NUM_THREADS=10 +export ASCEND_AGGREGATE_ENABLE=1 +export ASCEND_TRANSPORT_PRINT=0 +export ACL_OP_INIT_MODE=1 +export ASCEND_A3_ENABLE=1 +vllm serve /model/Qwen3-235B-A22B-W8A8 \ + --host 0.0.0.0 \ + --port 8004 \ + --api-server-count 2 \ + --data-parallel-size 2 \ + --data-parallel-size-local 2 \ + --data-parallel-address 192.0.0.2 \ + --data-parallel-rpc-port 13389 \ + --tensor-parallel-size 8 \ + --enable-expert-parallel \ + --seed 1024 \ + --enforce-eager \ + --distributed-executor-backend mp \ + --served-model-name qwen3-moe \ + --max-model-len 32768 \ + --max-num-batched-tokens 4096 \ + --trust-remote-code \ + --no-enable-prefix-caching \ + --gpu-memory-utilization 0.9 \ + --kv-transfer-config \ + '{"kv_connector": "MooncakeLayerwiseConnector", + "kv_role": "kv_producer", + "kv_port": "30100", + "engine_id": "1", + "kv_connector_module_path": "vllm_ascend.distributed.mooncake_layerwise_connector", + "kv_connector_extra_config": { + "prefill": { + "dp_size": 2, + "tp_size": 8 + }, + "decode": { + "dp_size": 32, + "tp_size": 1 + } + } + }' +``` + +:::: + +::::{tab-item} Decoder node 1 + +```shell +unset ftp_proxy +unset https_proxy +unset http_proxy +export HCCL_IF_IP=192.0.0.3 +export GLOO_SOCKET_IFNAME="eth0" # network card name +export TP_SOCKET_IFNAME="eth0" +export HCCL_SOCKET_IFNAME="eth0" +export VLLM_USE_V1=1 +export HCCL_BUFFSIZE=2048 +export OMP_PROC_BIND=false +export OMP_NUM_THREADS=10 +export ASCEND_AGGREGATE_ENABLE=1 +export ASCEND_TRANSPORT_PRINT=0 +export ACL_OP_INIT_MODE=1 +export ASCEND_A3_ENABLE=1 +vllm serve /model/Qwen3-235B-A22B-W8A8 \ + --host 0.0.0.0 \ + --port 8004 \ + --api-server-count 4 \ + --data-parallel-size 32 \ + --data-parallel-size-local 16 \ + --data-parallel-address 192.0.0.3 \ + --data-parallel-rpc-port 5964 \ + --tensor-parallel-size 1 \ + --enable-expert-parallel \ + --seed 1024 \ + --distributed-executor-backend mp \ + --served-model-name qwen3-moe \ + --max-model-len 32768 \ + --max-num-batched-tokens 512 \ + --max-num_seqs 16 \ + --trust-remote-code \ + --no-enable-prefix-caching \ + --gpu-memory-utilization 0.9 \ + --compilation-config '{"cudagraph_capture_sizes":[16]}' \ + --kv-transfer-config \ + '{"kv_connector": "MooncakeLayerwiseConnector", + "kv_role": "kv_consumer", + "kv_port": "30200", + "engine_id": "2", + "kv_connector_module_path": "vllm_ascend.distributed.mooncake_layerwise_connector", + "kv_connector_extra_config": { + "prefill": { + "dp_size": 2, + "tp_size": 8 + }, + "decode": { + "dp_size": 32, + "tp_size": 1 + } + } + }' +``` + +:::: + +::::{tab-item} Decoder node 2 + +```shell +unset ftp_proxy +unset https_proxy +unset http_proxy +export HCCL_IF_IP=192.0.0.4 +export GLOO_SOCKET_IFNAME="eth0" # network card name +export TP_SOCKET_IFNAME="eth0" +export HCCL_SOCKET_IFNAME="eth0" +export VLLM_USE_V1=1 +export HCCL_BUFFSIZE=2048 +export OMP_PROC_BIND=false +export OMP_NUM_THREADS=10 +export ASCEND_AGGREGATE_ENABLE=1 +export ASCEND_TRANSPORT_PRINT=0 +export ACL_OP_INIT_MODE=1 +export ASCEND_A3_ENABLE=1 +vllm serve /model/Qwen3-235B-A22B-W8A8 \ + --host 0.0.0.0 \ + --port 8004 \ + --headless \ + --data-parallel-size 32 \ + --data-parallel-size-local 16 \ + --data-parallel-start-rank 16 \ + --data-parallel-address 192.0.0.3 \ + --data-parallel-rpc-port 5964 \ + --tensor-parallel-size 1 \ + --enable-expert-parallel \ + --seed 1024 \ + --distributed-executor-backend mp \ + --served-model-name qwen3-moe \ + --max-model-len 32768 \ + --max-num-batched-tokens 512 \ + --max-num_seqs 16 \ + --trust-remote-code \ + --no-enable-prefix-caching \ + --gpu-memory-utilization 0.9 \ + --compilation-config '{"cudagraph_capture_sizes":[16]}' \ + --kv-transfer-config \ + '{"kv_connector": "MooncakeLayerwiseConnector", + "kv_role": "kv_consumer", + "kv_port": "30300", + "engine_id": "3", + "kv_connector_module_path": "vllm_ascend.distributed.mooncake_layerwise_connector", + "kv_connector_extra_config": { + "prefill": { + "dp_size": 2, + "tp_size": 8 + }, + "decode": { + "dp_size": 32, + "tp_size": 1 + } + } + }' +``` + +:::: + +::::: + +### mooncake + +:::::{tab-set} + +::::{tab-item} Prefiller node 1 + +```shell +unset ftp_proxy +unset https_proxy +unset http_proxy +export HCCL_IF_IP=192.0.0.1 +export GLOO_SOCKET_IFNAME="eth0" # network card name +export TP_SOCKET_IFNAME="eth0" +export HCCL_SOCKET_IFNAME="eth0" +export VLLM_USE_V1=1 +export HCCL_BUFFSIZE=1024 +export OMP_PROC_BIND=false +export OMP_NUM_THREADS=10 +vllm serve /model/Qwen3-235B-A22B-W8A8 \ + --host 0.0.0.0 \ + --port 8004 \ + --api-server-count 2 \ + --data-parallel-size 2 \ + --data-parallel-size-local 2 \ + --data-parallel-address 192.0.0.1 \ + --data-parallel-rpc-port 13389 \ + --tensor-parallel-size 8 \ + --enable-expert-parallel \ + --seed 1024 \ + --enforce-eager \ + --distributed-executor-backend mp \ + --served-model-name qwen3-moe \ + --max-model-len 32768 \ + --max-num-batched-tokens 4096 \ + --trust-remote-code \ + --no-enable-prefix-caching \ + --gpu-memory-utilization 0.9 \ + --kv-transfer-config \ + '{"kv_connector": "MooncakeConnector", + "kv_role": "kv_producer", + "kv_port": "30000", + "engine_id": "0", + "kv_connector_module_path": "vllm_ascend.distributed.mooncake_connector", + "kv_connector_extra_config": { + "use_ascend_direct": true, + "prefill": { + "dp_size": 2, + "tp_size": 8 + }, + "decode": { + "dp_size": 32, + "tp_size": 1 + } + } + }' +``` + +:::: + +::::{tab-item} Prefiller node 2 + +```shell +unset ftp_proxy +unset https_proxy +unset http_proxy +export HCCL_IF_IP=192.0.0.2 +export GLOO_SOCKET_IFNAME="eth0" # network card name +export TP_SOCKET_IFNAME="eth0" +export HCCL_SOCKET_IFNAME="eth0" +export VLLM_USE_V1=1 +export HCCL_BUFFSIZE=1024 +export OMP_PROC_BIND=false +export OMP_NUM_THREADS=10 +vllm serve /mnt/weight/Qwen3-235B-A22B-W8A8 \ + --host 0.0.0.0 \ + --port 8004 \ + --api-server-count 2 \ + --data-parallel-size 2 \ + --data-parallel-size-local 2 \ + --data-parallel-address 192.0.0.2 \ + --data-parallel-rpc-port 13389 \ + --tensor-parallel-size 8 \ + --enable-expert-parallel \ + --seed 1024 \ + --enforce-eager \ + --distributed-executor-backend mp \ + --served-model-name qwen3-moe \ + --max-model-len 32768 \ + --max-num-batched-tokens 4096 \ + --trust-remote-code \ + --no-enable-prefix-caching \ + --gpu-memory-utilization 0.9 \ + --kv-transfer-config \ + '{"kv_connector": "MooncakeConnector", + "kv_role": "kv_producer", + "kv_port": "30100", + "engine_id": "1", + "kv_connector_module_path": "vllm_ascend.distributed.mooncake_connector", + "kv_connector_extra_config": { + "prefill": { + "dp_size": 2, + "tp_size": 8 + }, + "decode": { + "dp_size": 32, + "tp_size": 1 + } + } + }' +``` + +:::: + +::::{tab-item} Decoder node 1 + +```shell +unset ftp_proxy +unset https_proxy +unset http_proxy +export HCCL_IF_IP=192.0.0.3 +export GLOO_SOCKET_IFNAME="eth0" # network card name +export TP_SOCKET_IFNAME="eth0" +export HCCL_SOCKET_IFNAME="eth0" +export VLLM_USE_V1=1 +export HCCL_BUFFSIZE=2048 +export OMP_PROC_BIND=false +export OMP_NUM_THREADS=10 + +vllm serve /model/Qwen3-235B-A22B-W8A8 \ + --host 0.0.0.0 \ + --port 8004 \ + --api-server-count 4 \ + --data-parallel-size 32 \ + --data-parallel-size-local 16 \ + --data-parallel-address 192.0.0.3 \ + --data-parallel-rpc-port 5964 \ + --tensor-parallel-size 1 \ + --enable-expert-parallel \ + --seed 1024 \ + --distributed-executor-backend mp \ + --served-model-name qwen3-moe \ + --max-model-len 32768 \ + --max-num-batched-tokens 512 \ + --max-num_seqs 16 \ + --trust-remote-code \ + --no-enable-prefix-caching \ + --gpu-memory-utilization 0.9 \ + --compilation-config '{"cudagraph_capture_sizes":[16]}' \ + --kv-transfer-config \ + '{"kv_connector": "MooncakeConnector", + "kv_role": "kv_consumer", + "kv_port": "30200", + "engine_id": "2", + "kv_connector_module_path": "vllm_ascend.distributed.mooncake_connector", + "kv_connector_extra_config": { + "prefill": { + "dp_size": 2, + "tp_size": 8 + }, + "decode": { + "dp_size": 32, + "tp_size": 1 + } + } + }' +``` + +:::: + +::::{tab-item} Decoder node 2 + +```shell +unset ftp_proxy +unset https_proxy +unset http_proxy +export HCCL_IF_IP=192.0.0.4 +export GLOO_SOCKET_IFNAME="eth0" # network card name +export TP_SOCKET_IFNAME="eth0" +export HCCL_SOCKET_IFNAME="eth0" +export VLLM_USE_V1=1 +export HCCL_BUFFSIZE=2048 +export OMP_PROC_BIND=false +export OMP_NUM_THREADS=10 + +vllm serve /model/Qwen3-235B-A22B-W8A8 \ + --host 0.0.0.0 \ + --port 8004 \ + --headless \ + --data-parallel-size 32 \ + --data-parallel-size-local 16 \ + --data-parallel-start-rank 16 \ + --data-parallel-address 192.0.0.3 \ + --data-parallel-rpc-port 5964 \ + --tensor-parallel-size 1 \ + --enable-expert-parallel \ + --seed 1024 \ + --distributed-executor-backend mp \ + --served-model-name qwen3-moe \ + --max-model-len 32768 \ + --max-num-batched-tokens 512 \ + --max-num_seqs 16 \ + --trust-remote-code \ + --no-enable-prefix-caching \ + --gpu-memory-utilization 0.9 \ + --compilation-config '{"cudagraph_capture_sizes":[16]}' \ + --kv-transfer-config \ + '{"kv_connector": "MooncakeConnector", + "kv_role": "kv_consumer", + "kv_port": "30300", + "engine_id": "3", + "kv_connector_module_path": "vllm_ascend.distributed.mooncake_connector", + "kv_connector_extra_config": { + "prefill": { + "dp_size": 2, + "tp_size": 8 + }, + "decode": { + "dp_size": 32, + "tp_size": 1 + } + } + }' +``` + +:::: + +::::: + +## Example proxy for Deployment + +Run a proxy server on the same node with prefiller service instance. You can get the proxy program in the repository's examples: [load\_balance\_proxy\_layerwise\_server\_example.py](https://github.com/vllm-project/vllm-ascend/blob/main/examples/disaggregated_prefill_v1/load_balance_proxy_server_example.py) + +```shell +python load_balance_proxy_layerwise_server_example.py \ + --host 192.0.0.1 \ + --port 8080 \ + --prefiller-hosts 192.0.0.1 192.0.0.2\ + --prefiller-port 8004 8004\ + --decoder-hosts 192.0.0.3 192.0.0.4\ + --decoder-ports 8004 8004 +``` + +## Verification + +Check service health using the proxy server endpoint. + +```shell +curl http://192.0.0.1:8080/v1/completions \ + -H "Content-Type: application/json" \ + -d '{ + "model": "qwen3-moe", + "prompt": "Who are you?", + "max_tokens": 100, + "temperature": 0 + }' +``` diff --git a/tests/ut/distributed/test_parallel_state.py b/tests/ut/distributed/test_parallel_state.py index f6c3315..c6724ce 100644 --- a/tests/ut/distributed/test_parallel_state.py +++ b/tests/ut/distributed/test_parallel_state.py @@ -32,8 +32,13 @@ def test_init_ascend_model_parallel(mock_distributed, parallel_config): mock_ascend_config.lmhead_tensor_parallel_size = 2 mock_ascend_config.oproj_tensor_parallel_size = 2 mock_ascend_config.pd_tp_ratio = 2 + mock_ascend_config.num_head_replica = 0 + mock_ascend_config.pd_head_ratio = 2 + mock_vllm_config = MagicMock() + mock_vllm_config.kv_transfer_config.is_kv_producer = True with patch('vllm_ascend.distributed.parallel_state.model_parallel_initialized', return_value=False), \ patch('vllm_ascend.distributed.parallel_state.init_model_parallel_group'), \ + patch('vllm_ascend.distributed.parallel_state.get_current_vllm_config', return_value=mock_vllm_config), \ patch('vllm_ascend.distributed.parallel_state.get_ascend_config', return_value=mock_ascend_config): init_ascend_model_parallel(parallel_config) diff --git a/tests/ut/kv_connector/test_mooncake_layerwise_connector.py b/tests/ut/kv_connector/test_mooncake_layerwise_connector.py index c7a1fcc..079df07 100644 --- a/tests/ut/kv_connector/test_mooncake_layerwise_connector.py +++ b/tests/ut/kv_connector/test_mooncake_layerwise_connector.py @@ -78,7 +78,8 @@ class TestKVCacheSendingLayerThreadBasic(unittest.TestCase): def setUp(self): self.p1 = patch( 'vllm_ascend.distributed.mooncake_layerwise_connector.get_ascend_config', - new=MagicMock(return_value=SimpleNamespace(pd_tp_ratio=1))) + new=MagicMock(return_value=SimpleNamespace( + pd_tp_ratio=1, num_head_replica=0, pd_head_ratio=1))) self.p2 = patch( 'vllm_ascend.distributed.mooncake_layerwise_connector.get_current_vllm_config', new=MagicMock(return_value=SimpleNamespace( @@ -242,7 +243,8 @@ class TestSendingLayerThread(unittest.TestCase): def setUp(self): self.p1 = patch( 'vllm_ascend.distributed.mooncake_layerwise_connector.get_ascend_config', - new=MagicMock(return_value=SimpleNamespace(pd_tp_ratio=1))) + new=MagicMock(return_value=SimpleNamespace( + pd_tp_ratio=1, num_head_replica=0, pd_head_ratio=1))) self.p2 = patch( 'vllm_ascend.distributed.mooncake_layerwise_connector.get_current_vllm_config', new=MagicMock(return_value=SimpleNamespace( @@ -900,7 +902,9 @@ class TestMooncakeLayerwiseConnectorWorker(unittest.TestCase): {'vllm_ascend.envs': self.envs_ascend_mock}), patch( 'vllm_ascend.distributed.mooncake_layerwise_connector.get_ascend_config', - return_value=SimpleNamespace(pd_tp_ratio=1), + return_value=SimpleNamespace(pd_tp_ratio=1, + num_head_replica=0, + pd_head_ratio=1), ), patch( 'vllm_ascend.distributed.mooncake_layerwise_connector.get_current_vllm_config', diff --git a/vllm_ascend/ascend_config.py b/vllm_ascend/ascend_config.py index 93579ce..2f2de07 100644 --- a/vllm_ascend/ascend_config.py +++ b/vllm_ascend/ascend_config.py @@ -100,13 +100,28 @@ class AscendConfig: "oproj_tensor_parallel_size is only supported in pd scenario and can only be used in D node." ) self.pd_tp_ratio = 1 + self.pd_head_ratio = 1 + self.num_head_replica = 0 if vllm_config.kv_transfer_config is not None and not vllm_config.model_config.is_deepseek_mla: prefill_tp_size = vllm_config.kv_transfer_config.get_from_extra_config( "prefill", {"tp_size": 1})["tp_size"] decode_tp_size = vllm_config.kv_transfer_config.get_from_extra_config( "decode", {"tp_size": 1})["tp_size"] - pd_tp_ratio: int = prefill_tp_size // decode_tp_size - self.pd_tp_ratio = pd_tp_ratio + assert prefill_tp_size % decode_tp_size == 0, "Prefill TP size must be divisible by Decode TP size." + self.pd_tp_ratio = prefill_tp_size // decode_tp_size + if self.pd_tp_ratio > 1: + try: + # only support Qwen model now + # TODO: use a more robust method to get kv_head_num + num_kv_head = vllm_config.model_config.hf_config.num_key_value_heads + self.num_head_replica = prefill_tp_size // num_kv_head + prefill_tp_size = min(prefill_tp_size, num_kv_head) + decode_tp_size = min(decode_tp_size, num_kv_head) + self.pd_head_ratio = prefill_tp_size // decode_tp_size + except Exception: + raise AssertionError( + "Can not get num_key_value_heads from model_config") + if self.pd_tp_ratio == 0: raise AssertionError( "Only support P node tp size lagger then D node tp size") diff --git a/vllm_ascend/distributed/mooncake_layerwise_connector.py b/vllm_ascend/distributed/mooncake_layerwise_connector.py index c199a37..87f59e8 100644 --- a/vllm_ascend/distributed/mooncake_layerwise_connector.py +++ b/vllm_ascend/distributed/mooncake_layerwise_connector.py @@ -277,7 +277,7 @@ class SendingLayerThread(threading.Thread): self.send_queue = queue.Queue[tuple[DecodeMooncakeAgentMetadata, str, list[int], int, torch.Tensor, torch.Tensor]]() - self.completion_event: threading.Event + self.completion_event: Optional[threading.Event] = None self.completion_event_count: int self.task_tracker = task_tracker self.total_layers = total_layers @@ -287,6 +287,8 @@ class SendingLayerThread(threading.Thread): self.engine = engine self.tp_rank = tp_rank self.pd_tp_ratio = get_ascend_config().pd_tp_ratio + self.num_head_replica = get_ascend_config().num_head_replica + self.pd_head_ratio = get_ascend_config().pd_head_ratio vllm_config = get_current_vllm_config() max_model_len = vllm_config.scheduler_config.max_model_len first_kv_cache = first_kv_cache[:max_model_len] @@ -358,7 +360,9 @@ class SendingLayerThread(threading.Thread): remote_kv_base_addrs = req_meta.kv_caches_base_addr remote_block_ids = req_meta.block_ids - if self.pd_tp_ratio == 1: + if self.num_head_replica >= 1 and self.tp_rank % self.num_head_replica != 0: + pass + elif self.pd_head_ratio == 1: layer_local_kv_base_addr = [ self.local_kv_base_addr[i] for i in [2 * layer_index, 2 * layer_index + 1] @@ -420,7 +424,7 @@ class SendingLayerThread(threading.Thread): src_layer_addr = src_layer_base_addr for group_remote_block_id in grouped_remote_block_ids: block_len = self.block_len[0] - remote_block_len = self.block_len[0] * self.pd_tp_ratio + remote_block_len = self.block_len[0] * self.pd_head_ratio src_list.append(src_layer_addr) if src_layer_addr + len( @@ -436,23 +440,21 @@ class SendingLayerThread(threading.Thread): dst_list.append(dst_layer_base_addr + group_remote_block_id[0] * remote_block_len + length * - (self.tp_rank % self.pd_tp_ratio)) + ((self.tp_rank // self.num_head_replica) % + self.pd_head_ratio)) src_layer_addr += length torch.npu.synchronize() ret = self.engine.batch_transfer_sync_write( session_id, src_list, dst_list, length_list) - self.completion_event_count -= 1 - - if self.completion_event_count == 0 and self.completion_event is not None: - print( - f"[_transfer_kv_cache] {self.completion_event_count} self.event.set()" - ) - self.completion_event.set() - if ret < 0: logger.error("Mooncake transfer failed for request %s", req_meta.req_id) raise RuntimeError(f"Mooncake transfer failed, ret: {ret}") + if self.completion_event is not None: + self.completion_event_count -= 1 + if self.completion_event_count == 0: + self.completion_event.set() + self.completion_event = None def add_event(self, event: threading.Event, count: int) -> None: self.completion_event = event @@ -924,6 +926,8 @@ class MooncakeLayerwiseConnectorWorker: self.kv_caches_base_addr: list[int] = [] self.pd_tp_ratio = get_ascend_config().pd_tp_ratio + self.pd_head_ratio = get_ascend_config().pd_head_ratio + self.first_kv_cache = None def _get_prefill_decode_size(self, vllm_config: VllmConfig): @@ -1104,7 +1108,7 @@ class MooncakeLayerwiseConnectorWorker: path = make_zmq_path( "tcp", meta.remote_host, meta.remote_port + self.tp_rank * self.pd_tp_ratio + offset) - logger.debug( + logger.info( f"Notify the prefiller: {path} that request: {req_id} from decoder is ready." ) msg_encoder = msgspec.msgpack.Encoder() @@ -1142,7 +1146,7 @@ class MooncakeLayerwiseConnectorWorker: **kwargs) -> None: """MooncakeLayerwiseConnector does not save explicitly.""" if self.kv_role == 'kv_producer': - if self.pd_tp_ratio != 1: + if self.pd_head_ratio != 1: if self.current_layer != 0: self.completion_event.wait() self.completion_event = threading.Event() @@ -1153,8 +1157,9 @@ class MooncakeLayerwiseConnectorWorker: def sort_kv_cache(input_kv: list[list[int]]): return torch.cat([ - torch.chunk(tensor, self.pd_tp_ratio, dim=0)[x] - for x in range(self.pd_tp_ratio) for tensor in input_kv + torch.chunk(tensor, self.pd_head_ratio, dim=0)[x] + for x in range(self.pd_head_ratio) + for tensor in input_kv ]) total_block_ids = [ @@ -1176,7 +1181,7 @@ class MooncakeLayerwiseConnectorWorker: keys = sort_kv_cache(keys) # [req1_key, req2_key] values = sort_kv_cache(values) (keys, - values) = kv_alltoall_and_rearrange(self.pd_tp_ratio, keys, + values) = kv_alltoall_and_rearrange(self.pd_head_ratio, keys, values) key_start_id = 0 value_start_id = 0 @@ -1185,7 +1190,7 @@ class MooncakeLayerwiseConnectorWorker: value = None for req_id, request in connector_metadata.requests.items(): logger.info(f"Add request {req_id} to kv send layer thread. ") - if self.pd_tp_ratio != 1: + if self.pd_head_ratio != 1: key_block_num = len( request.local_block_ids) * key_block_size value_block_num = len( diff --git a/vllm_ascend/distributed/parallel_state.py b/vllm_ascend/distributed/parallel_state.py index 071a234..49f6e47 100644 --- a/vllm_ascend/distributed/parallel_state.py +++ b/vllm_ascend/distributed/parallel_state.py @@ -1,7 +1,7 @@ from typing import Optional import torch -from vllm.config import ParallelConfig +from vllm.config import ParallelConfig, get_current_vllm_config from vllm.distributed.parallel_state import (GroupCoordinator, get_world_group, init_model_parallel_group) @@ -63,19 +63,42 @@ def init_ascend_model_parallel(parallel_config: ParallelConfig, ): parallel_config.tensor_parallel_size) pd_tp_ratio = get_ascend_config().pd_tp_ratio + pd_head_ratio = get_ascend_config().pd_head_ratio global _P_TP assert _P_TP is None, ( "distributed prefill tensor parallel group is already initialized") - prefill_tensor_model_parallel_size = pd_tp_ratio if \ - pd_tp_ratio > 0 and pd_tp_ratio < parallel_config.tensor_parallel_size else parallel_config.tensor_parallel_size - group_ranks = all_ranks.view(-1, - prefill_tensor_model_parallel_size).unbind(0) - group_ranks = [x.tolist() for x in group_ranks] - num = get_world_group().local_rank // pd_tp_ratio - _P_TP = init_model_parallel_group(group_ranks, - get_world_group().local_rank, - backend, - group_name=f"p_tp_{num}") + prefill_tensor_model_parallel_size = pd_tp_ratio + # divide alltoall groups + if pd_head_ratio > 1 and get_current_vllm_config( + ).kv_transfer_config.is_kv_producer: + num_head_replica = get_ascend_config().num_head_replica + remote_tp_size = parallel_config.tensor_parallel_size // pd_tp_ratio + if num_head_replica <= 1: + group_ranks = all_ranks.view( + -1, prefill_tensor_model_parallel_size).unbind(0) + else: + group_ranks = all_ranks.clone().view( + parallel_config.data_parallel_size, -1, + num_head_replica) # [DP_size, num_head, num_head_replica] + group_ranks = group_ranks.permute(0, 2, 1) + group_ranks = group_ranks.reshape( + -1, + group_ranks.size(-1)) # [DP_size * num_head_replica, num_head] + alltoall_group_size = group_ranks.size(-1) // remote_tp_size + group_ranks = group_ranks.unsqueeze(-1).view( + parallel_config.data_parallel_size, num_head_replica, -1, + alltoall_group_size + ) # [DP_size, num_head_replica, num_alltoall_group, alltoall_group_size] + group_ranks = group_ranks.view(-1, alltoall_group_size).unbind(0) + group_ranks = [x.tolist() for x in group_ranks] + local_rank = get_world_group().local_rank + num = next( + (i for i, ranks in enumerate(group_ranks) if local_rank in ranks), + None) + _P_TP = init_model_parallel_group(group_ranks, + get_world_group().local_rank, + backend, + group_name=f"p_tp_{num}") global _MC2 group_ranks = all_ranks.unbind(0)