diff --git a/docs/source/developer_guide/feature_guide/eplb_swift_balancer.md b/docs/source/developer_guide/feature_guide/eplb_swift_balancer.md index 999dc6e1..7ed8e14f 100644 --- a/docs/source/developer_guide/feature_guide/eplb_swift_balancer.md +++ b/docs/source/developer_guide/feature_guide/eplb_swift_balancer.md @@ -179,7 +179,7 @@ class RandomLoadBalance(EplbPolicy): #### Integer Parameters -All integer input parameters must explicitly specify their maximum and minimum values and be subject to valid value validation. For example, `num_iterations_eplb_update` must be greater than 0: +All integer input parameters must explicitly specify their maximum and minimum values and be subject to valid value validation. For example, `expert_heat_collection_interval` must be greater than 0: ```python @staticmethod diff --git a/docs/source/user_guide/configuration/additional_config.md b/docs/source/user_guide/configuration/additional_config.md index 1e0cc491..02e09e62 100644 --- a/docs/source/user_guide/configuration/additional_config.md +++ b/docs/source/user_guide/configuration/additional_config.md @@ -30,6 +30,7 @@ The following table lists additional configuration options available in vLLM Asc | `weight_prefetch_config` | dict | `{}` | Configuration options for weight prefetch | | `finegrained_tp_config` | dict | `{}` | Configuration options for module tensor parallelism | | `ascend_compilation_config` | dict | `{}` | Configuration options for ascend compilation | +| `eplb_config` | dict | `{}` | Configuration options for ascend compilation | | `refresh` | bool | `false` | Whether to refresh global Ascend configuration content. This is usually used by rlhf or ut/e2e test case. | | `dump_config_path` | str | `None` | Configuration file path for msprobe dump(eager mode). | | `enable_async_exponential` | bool | `False` | Whether to enable async exponential overlap. To enable async exponential, set this config to True. | @@ -41,13 +42,6 @@ The following table lists additional configuration options available in vLLM Asc | `SLO_limits_for_dynamic_batch` | int | `-1` | SLO limits for dynamic batch. This is new scheduler to support dynamic feature | | `enable_npugraph_ex` | bool | `False` | Whether to enable npugraph ex graph mode. | | `pa_shape_list` | list | `[]` | The custom shape list of page attention ops. | -| `dynamic_eplb` | bool | `False` | Whether to enable dynamic EPLB. | -| `expert_map_path` | str | `None` | When using expert load balancing for an MoE model, an expert map path needs to be passed in. | -| `num_iterations_eplb_update` | int | `400` | Forward iterations when EPLB begins. | -| `gate_eplb` | bool | `False` | Whether to enable EPLB only once. | -| `num_wait_worker_iterations` | int | `30` | The forward iterations when the EPLB worker will finish CPU tasks. In our test default value 30 can cover most cases. | -| `expert_map_record_path` | str | `None` | Save the expert load calculation results to a new expert table in the specified directory. | -| `init_redundancy_expert` | int | `0` | Specify redundant experts during initialization. | | `enable_kv_nz` | bool | `False` | Whether to enable kvcache NZ layout. This option only takes effects on models using MLA (e.g., DeepSeek). | | `layer_sharding` | dict | `{}` | Configuration options for layer sharding linear | @@ -83,6 +77,17 @@ The details of each configuration option are as follows: | `fuse_norm_quant` | bool | `True` | Whether to enable fuse_norm_quant pass. | | `fuse_qknorm_rope` | bool | `False` | Whether to enable fuse_qknorm_rope pass. It's set to True by default when Triton is installed. | +**eplb_config** + +| Name | Type | Default | Description | +| ---- | ---- | ------- | ----------- | +| `dynamic_eplb` | bool| `False`| Whether to enable dynamic EPLB. | +| `expert_map_path` | str | `None` | When using expert load balancing for an MoE model, an expert map path needs to be passed in.| +| `expert_heat_collection_interval`| int | `400` | Forward iterations when EPLB begins. | +| `algorithm_execution_interval` | int | `30` | The forward iterations when the EPLB worker will finish CPU tasks. | +| `expert_map_record_path` | str | `None` | Save the expert load calculation results to a new expert table in the specified directory.| +| `num_redundant_experts` | int | `0` | Specify redundant experts during initialization. | + ### Example An example of additional configuration is as follows: diff --git a/docs/source/user_guide/feature_guide/eplb_swift_balancer.md b/docs/source/user_guide/feature_guide/eplb_swift_balancer.md index 5e6dc4eb..35f78931 100644 --- a/docs/source/user_guide/feature_guide/eplb_swift_balancer.md +++ b/docs/source/user_guide/feature_guide/eplb_swift_balancer.md @@ -26,17 +26,17 @@ W8A8-dynamic ### Dynamic EPLB -We need to add environment variable `export DYNAMIC_EPLB="true"` to enable vllm eplb. Enable dynamic balancing with auto-tuned parameters. Adjust num_iterations_eplb_update and num_wait_worker_iterations based on workload patterns. +We need to add environment variable `export DYNAMIC_EPLB="true"` to enable vllm eplb. Enable dynamic balancing with auto-tuned parameters. Adjust expert_heat_collection_interval and algorithm_execution_interval based on workload patterns. ```shell vllm serve Qwen/Qwen3-235B-A22 \ --tensor-parallel-size 16 \ --enable-expert-parallel \ - --additional-config '{ + --additional-config '{ "eplb_config": { "dynamic_eplb": true, - "num_iterations_eplb_update": 400, - "num_wait_worker_iterations": 30 - }' + "expert_heat_collection_interval": 400, + "algorithm_execution_interval": 30 + }}' ``` ### Static EPLB @@ -49,12 +49,12 @@ We need to add environment variable `export EXPERT_MAP_RECORD="true"` to record vllm serve Qwen/Qwen3-235B-A22 \ --tensor-parallel-size 16 \ --enable-expert-parallel \ - --additional-config '{ + --additional-config '{ "eplb_config": { "expert_map_record_path": "/path/to/eplb.json", - "init_redundancy_expert": 16, - "num_iterations_eplb_update": 400, - "num_wait_worker_iterations": 30 - }' + "num_redundant_experts": 16, + "expert_heat_collection_interval": 400, + "algorithm_execution_interval": 30 + }}' ``` #### Subsequent Deployments (Use Recorded Map) @@ -73,9 +73,9 @@ vllm serve Qwen/Qwen3-235B-A22 \ ## Critical Considerations 1. Parameter Tuning: - - num_iterations_eplb_update: Higher values (e.g., 400+) for stable workloads; lower values (e.g., 100-200) for fluctuating traffic. - - num_wait_worker_iterations: Should be ≥ 30 to avoid premature balancing during startup. - - init_redundancy_expert: Must match tensor-parallel size (e.g., 16 for 16 GPUs) to ensure sufficient redundancy. + - expert_heat_collection_interval: Higher values (e.g., 400+) for stable workloads; lower values (e.g., 100-200) for fluctuating traffic. + - algorithm_execution_interval: Should be ≥ 30 to avoid premature balancing during startup. + - num_redundant_experts: Must match tensor-parallel size (e.g., 16 for 16 GPUs) to ensure sufficient redundancy. 2. Hardware Requirements: - Ensure that all GPUs have identical memory capacity and compute capabilities. @@ -85,20 +85,16 @@ vllm serve Qwen/Qwen3-235B-A22 \ - Only MoE models with explicit expert parallelism support (e.g., Qwen3 MoE models) are compatible. - Verify model architecture supports dynamic expert routing through --enable-expert-parallel. -4. Gating Configuration: - - When gate_eplb=true, validate that the gating mechanism can handle expert movement without routing errors. - - Test with synthetic workloads before production deployment. - -5. Monitoring & Validation: +4. Monitoring & Validation: - Track metrics: expert_load_balance_ratio, ttft_p99, tpot_avg, and gpu_utilization. - Use vllm monitor to detect imbalances during runtime. - Always verify expert map JSON structure before loading (validate with jq or similar tools). -6. Startup Behavior: +5. Startup Behavior: - Initial requests may experience higher latency during the first balancing cycle (typically 1-2 minutes). - Avoid sudden traffic spikes during the warm-up phase. -7. Common Pitfalls: +6. Common Pitfalls: - Incorrect tensor-parallel-size vs. actual GPU count → causes resource underutilization. - Using expert_map_path without generating the map first → runtime errors. - - Setting init_redundancy_expert > available GPUs → system failure. + - Setting num_redundant_experts > available GPUs → system failure. diff --git a/tests/e2e/multicard/2-cards/test_qwen3_moe.py b/tests/e2e/multicard/2-cards/test_qwen3_moe.py index bdb9bd3a..4dcc3f29 100644 --- a/tests/e2e/multicard/2-cards/test_qwen3_moe.py +++ b/tests/e2e/multicard/2-cards/test_qwen3_moe.py @@ -105,10 +105,12 @@ async def test_qwen3_moe_w8a8_distributed_tp2_ep_dynamic_eplb(): # during initialization in offline mode, so the online mode is used instead. env_dict.update({"DYNAMIC_EPLB": "true"}) additional_config = { - "dynamic_eplb": True, - "num_iterations_eplb_update": 100, - "num_wait_worker_iterations": 20, - "num_redundant_experts": 2 + "eplb_config": { + "dynamic_eplb": True, + "expert_heat_collection_interval": 100, + "algorithm_execution_interval": 20, + "num_redundant_experts": 2 + } } server_args.extend(["--additional-config", json.dumps(additional_config)]) with RemoteOpenAIServer(model, diff --git a/tests/e2e/nightly/multi_node/config/DeepSeek-R1-W8A8-EPLB.yaml b/tests/e2e/nightly/multi_node/config/DeepSeek-R1-W8A8-EPLB.yaml index 8eae6d2e..302f4d52 100644 --- a/tests/e2e/nightly/multi_node/config/DeepSeek-R1-W8A8-EPLB.yaml +++ b/tests/e2e/nightly/multi_node/config/DeepSeek-R1-W8A8-EPLB.yaml @@ -55,7 +55,7 @@ deployment: } }' --additional-config - '{"dynamic_eplb":true,"num_iterations_eplb_update":2048,"num_wait_worker_iterations":200}' + '{"enable_prefill_optimizations":true,"enable_weight_nz_layout":true,"eplb_config": {"dynamic_eplb":true,"expert_heat_collection_interval":2048,"algorithm_execution_interval":200}}' - server_cmd: > @@ -92,7 +92,7 @@ deployment: } }' --additional-config - '{"dynamic_eplb":true,"num_iterations_eplb_update":2048,"num_wait_worker_iterations":200}' + '{"enable_prefill_optimizations":true,"enable_weight_nz_layout":true,"eplb_config": {"dynamic_eplb":true,"expert_heat_collection_interval":2048,"algorithm_execution_interval":200}}' - server_cmd: > vllm serve vllm-ascend/DeepSeek-R1-0528-W8A8 @@ -130,7 +130,7 @@ deployment: } }' --additional-config - '{"multistream_overlap_shared_expert":true,"dynamic_eplb":true,"num_iterations_eplb_update":2048,"num_wait_worker_iterations":200}' + '{"multistream_overlap_shared_expert":true,"dynamic_eplb":true,"expert_heat_collection_interval":2048,"algorithm_execution_interval":200}' - server_cmd: > vllm serve vllm-ascend/DeepSeek-R1-0528-W8A8 @@ -167,7 +167,7 @@ deployment: } }' --additional-config - '{"multistream_overlap_shared_expert":true,"dynamic_eplb":true,"num_iterations_eplb_update":2048,"num_wait_worker_iterations":200}' + '{"multistream_overlap_shared_expert":true,"eplb_config": {"dynamic_eplb":true,"expert_heat_collection_interval":2048,"algorithm_execution_interval":200}}' benchmarks: perf: case_type: performance diff --git a/tests/e2e/nightly/multi_node/config/Qwen3-235B-W8A8-EPLB.yaml b/tests/e2e/nightly/multi_node/config/Qwen3-235B-W8A8-EPLB.yaml index a8df4407..b21d8b2c 100644 --- a/tests/e2e/nightly/multi_node/config/Qwen3-235B-W8A8-EPLB.yaml +++ b/tests/e2e/nightly/multi_node/config/Qwen3-235B-W8A8-EPLB.yaml @@ -51,7 +51,7 @@ deployment: } }' --additional-config - '{"dynamic_eplb":true,"num_iterations_eplb_update":2048,"num_wait_worker_iterations":200}' + '{"eplb_config": {"dynamic_eplb":true,"expert_heat_collection_interval":2048,"algorithm_execution_interval":200}}' - server_cmd: > @@ -87,5 +87,5 @@ deployment: } }' --additional-config - '{"dynamic_eplb":true,"num_iterations_eplb_update":2048,"num_wait_worker_iterations":200}' + '{"eplb_config": {"dynamic_eplb":true,"expert_heat_collection_interval":2048,"algorithm_execution_interval":200}}' benchmarks: diff --git a/tests/e2e/nightly/single_node/models/test_deepseek_r1_0528_w8a8_eplb.py b/tests/e2e/nightly/single_node/models/test_deepseek_r1_0528_w8a8_eplb.py index c593761b..c26c8ec2 100644 --- a/tests/e2e/nightly/single_node/models/test_deepseek_r1_0528_w8a8_eplb.py +++ b/tests/e2e/nightly/single_node/models/test_deepseek_r1_0528_w8a8_eplb.py @@ -70,11 +70,12 @@ async def test_models(model: str) -> None: additional_config: dict[str, Any] = { "enable_shared_expert_dp": False, "multistream_overlap_shared_expert": False, - "dynamic_eplb": True, - "num_iterations_eplb_update": 14000, - "num_wait_worker_iterations": 30, - "init_redundancy_expert": 0, - "gate_eplb": False + "eplb_config": { + "dynamic_eplb": True, + "expert_heat_collection_interval": 512, + "algorithm_execution_interval": 100, + "num_redundant_experts": 0 + } } server_args = [ "--quantization", "ascend", "--seed", "1024", diff --git a/tests/e2e/nightly/single_node/models/test_qwen3_235b_a22b_w8a8_eplb.py b/tests/e2e/nightly/single_node/models/test_qwen3_235b_a22b_w8a8_eplb.py index e3b3e4f6..50a6ae53 100644 --- a/tests/e2e/nightly/single_node/models/test_qwen3_235b_a22b_w8a8_eplb.py +++ b/tests/e2e/nightly/single_node/models/test_qwen3_235b_a22b_w8a8_eplb.py @@ -70,13 +70,13 @@ async def test_models(model: str) -> None: "8192", "--max-num-seqs", "12", "--trust-remote-code", "--gpu-memory-utilization", "0.9" ] - env_dict["EXPERT_MAP_RECORD"] = "true" env_dict["DYNAMIC_EPLB"] = "true" - additional_config["dynamic_eplb"] = True - additional_config["num_iterations_eplb_update"] = 14000 - additional_config["num_wait_worker_iterations"] = 30 - additional_config["init_redundancy_expert"] = 0 - additional_config["gate_eplb"] = False + additional_config["eplb_config"] = { + "dynamic_eplb": True, + "expert_heat_collection_interval": 512, + "algorithm_execution_interval": 100, + "num_redundant_experts": 0 + } server_args.extend( ["--compilation-config", json.dumps(compilation_config)]) diff --git a/tests/ut/eplb/core/test_eplb_utils.py b/tests/ut/eplb/core/test_eplb_utils.py index 2b62d850..4d7f8fee 100644 --- a/tests/ut/eplb/core/test_eplb_utils.py +++ b/tests/ut/eplb/core/test_eplb_utils.py @@ -1,10 +1,8 @@ import os -import sys import unittest from unittest.mock import patch # isort: off -import pytest import torch from vllm.config import VllmConfig from vllm.model_executor.layers.fused_moe.config import (FusedMoEConfig, @@ -12,7 +10,7 @@ from vllm.model_executor.layers.fused_moe.config import (FusedMoEConfig, ) from vllm_ascend.ascend_config import init_ascend_config -from vllm_ascend.eplb.core.eplb_utils import EPLBParamUtils, init_eplb_config +from vllm_ascend.eplb.core.eplb_utils import init_eplb_config # isort: on @@ -20,23 +18,28 @@ class TestAscendConfig(unittest.TestCase): def setUp(self): vllm_config = VllmConfig() - ascend_config = init_ascend_config(vllm_config) - ascend_config.dynamic_eplb = True - ascend_config.init_redundancy_expert = 2 + vllm_config.additional_config = { + "refresh": True, + "eplb_config": { + "dynamic_eplb": True, + "num_redundant_experts": 2 + } + } moe_parallel_config = FusedMoEParallelConfig(2, 0, 1, 2, 1, 1, 1, 1, True, "hccl") moe_config = FusedMoEConfig(8, 8, 8192, 5, moe_parallel_config, torch.float16) moe_config.supports_eplb = True - self.ascend_config = ascend_config + self.vllm_config = vllm_config self.moe_config = moe_config self.mock_npu = patch("torch.Tensor.npu", new=lambda self: self).start() self.rank = 1 def test_init_eplb_config_with_eplb(self): + eplb_config = init_ascend_config(self.vllm_config).eplb_config expert_map, log2phy, redundant_experts = init_eplb_config( - self.ascend_config, 0, self.moe_config) + eplb_config, 0, self.moe_config) gt_expert_map = torch.tensor([4, -1, -1, -1, 0, 1, 2, 3]) gt_log2phy = torch.tensor([9, 1, 2, 3, 5, 6, 7, 8]) self.assertTrue(torch.equal(expert_map[self.rank], gt_expert_map)) @@ -45,9 +48,11 @@ class TestAscendConfig(unittest.TestCase): def test_init_eplb_config_with_eplb_withmap(self): _TEST_DIR = os.path.dirname(__file__) - self.ascend_config.expert_map_path = _TEST_DIR + "/expert_map.json" + self.vllm_config.additional_config["eplb_config"][ + "expert_map_path"] = _TEST_DIR + "/expert_map.json" + eplb_config = init_ascend_config(self.vllm_config).eplb_config expert_map, log2phy, redundant_experts = init_eplb_config( - self.ascend_config, 0, self.moe_config) + eplb_config, 0, self.moe_config) gt_expert_map = torch.tensor([-1, 1, 4, -1, 2, -1, 0, 3]) gt_log2phy = torch.tensor([2, 6, 9, 3, 7, 4, 5, 8]) self.assertTrue(torch.equal(expert_map[self.rank], gt_expert_map)) @@ -55,159 +60,11 @@ class TestAscendConfig(unittest.TestCase): self.assertEqual(redundant_experts, 2) def test_init_eplb_config_without_eplb(self): - self.ascend_config.dynamic_eplb = False - self.ascend_config.expert_map_path = None + self.vllm_config.additional_config = {"refresh": True} + eplb_config = init_ascend_config(self.vllm_config).eplb_config expert_map, log2phy, redundant_experts = init_eplb_config( - self.ascend_config, 0, self.moe_config) + eplb_config, 0, self.moe_config) gt_expert_map = torch.tensor([-1, -1, -1, -1, 0, 1, 2, 3]) print(expert_map, log2phy, redundant_experts) self.assertTrue(torch.equal(expert_map[self.rank], gt_expert_map)) self.assertEqual(redundant_experts, 0) - - -class TestEPLBParamUtils: - - def test_check_iterations_valid(self): - EPLBParamUtils.check_iterations(1) - EPLBParamUtils.check_iterations(100) - - def test_check_iterations_type_error(self): - with pytest.raises(TypeError, match="is not int"): - EPLBParamUtils.check_iterations("abc") - with pytest.raises(TypeError, match="is not int"): - EPLBParamUtils.check_iterations(1.5) - with pytest.raises(TypeError, match="is not int"): - EPLBParamUtils.check_iterations(None) - - def test_check_iterations_value_error_less_than_or_equal_zero(self): - with pytest.raises(ValueError, - match="can not less than or equal to 0"): - EPLBParamUtils.check_iterations(0) - with pytest.raises(ValueError, - match="can not less than or equal to 0"): - EPLBParamUtils.check_iterations(-1) - - def test_check_iterations_value_error_large_than_sys_maxsize(self): - large_value = sys.maxsize + 1 - with pytest.raises(ValueError, - match=f"can not large than {sys.maxsize}"): - EPLBParamUtils.check_iterations(large_value) - - def test_check_dynamic_eplb_none(self): - EPLBParamUtils.check_dynamic_eplb(None) - - def test_check_dynamic_eplb_valid_bool(self): - EPLBParamUtils.check_dynamic_eplb(False) - - def test_check_dynamic_eplb_type_error(self): - with pytest.raises(TypeError, match="The dynamic_eplb is not bool."): - EPLBParamUtils.check_dynamic_eplb("true") - with pytest.raises(TypeError, match="The dynamic_eplb is not bool."): - EPLBParamUtils.check_dynamic_eplb(1) - - def test_check_dynamic_eplb_value_error_env_not_set(self, monkeypatch): - monkeypatch.delenv("DYNAMIC_EPLB", raising=False) - with pytest.raises( - ValueError, - match= - 'Can not enable dynamic_eplb when DYNAMIC_EPLB is not set to "true" or "1".' - ): - EPLBParamUtils.check_dynamic_eplb(True) - - monkeypatch.setenv("DYNAMIC_EPLB", "false") - with pytest.raises( - ValueError, - match= - 'Can not enable dynamic_eplb when DYNAMIC_EPLB is not set to "true" or "1".' - ): - EPLBParamUtils.check_dynamic_eplb(True) - - monkeypatch.setenv("DYNAMIC_EPLB", "any_other_value") - with pytest.raises( - ValueError, - match= - 'Can not enable dynamic_eplb when DYNAMIC_EPLB is not set to "true" or "1".' - ): - EPLBParamUtils.check_dynamic_eplb(True) - - def test_check_dynamic_eplb_valid_with_env_set(self, monkeypatch): - monkeypatch.setenv("DYNAMIC_EPLB", "true") - EPLBParamUtils.check_dynamic_eplb(True) - - monkeypatch.setenv("DYNAMIC_EPLB", "True") - EPLBParamUtils.check_dynamic_eplb(True) - - monkeypatch.setenv("DYNAMIC_EPLB", "1") - EPLBParamUtils.check_dynamic_eplb(True) - - def test_check_expert_map_path_none(self): - EPLBParamUtils.check_expert_map_path(None) - - def test_check_expert_map_path_type_error_not_string(self): - with pytest.raises(TypeError, match="The expert_map is not str."): - EPLBParamUtils.check_expert_map_path(123) - with pytest.raises(TypeError, match="The expert_map is not str."): - EPLBParamUtils.check_expert_map_path(True) - - def test_check_expert_map_path_value_error_empty_string(self): - with pytest.raises(ValueError, match="The expert_map is not empty."): - EPLBParamUtils.check_expert_map_path("") - with pytest.raises(ValueError, match="The expert_map is not empty."): - EPLBParamUtils.check_expert_map_path(" ") - - def test_check_expert_map_path_type_error_incorrect_extension(self): - with pytest.raises(TypeError, match="The expert_map is not json."): - EPLBParamUtils.check_expert_map_path("path/to/map.txt") - with pytest.raises(TypeError, match="The expert_map is not json."): - EPLBParamUtils.check_expert_map_path("path/to/map.JSON_") - - @patch('os.path.exists', return_value=False) - def test_check_expert_map_path_value_error_not_exist(self, mock_exists): - with pytest.raises(ValueError, match="The expert_map is not exist."): - EPLBParamUtils.check_expert_map_path("non_existent_map.json") - mock_exists.assert_called_once_with("non_existent_map.json") - - def test_check_expert_map_record_path_none(self): - EPLBParamUtils.check_expert_map_record_path(None) - - def test_check_expert_map_record_path_type_error_not_string(self): - with pytest.raises(TypeError, - match="The expert_map_record_path is not str."): - EPLBParamUtils.check_expert_map_record_path(123) - with pytest.raises(TypeError, - match="The expert_map_record_path is not str."): - EPLBParamUtils.check_expert_map_record_path(False) - - def test_check_expert_map_record_path_value_error_empty_string(self): - with pytest.raises(ValueError, - match="The expert_map_record_path is empty."): - EPLBParamUtils.check_expert_map_record_path("") - with pytest.raises(ValueError, - match="The expert_map_record_path is empty."): - EPLBParamUtils.check_expert_map_record_path(" ") - - def test_check_expert_map_record_path_type_error_incorrect_extension(self): - with pytest.raises(TypeError, - match="The expert_map_record_path is not json."): - EPLBParamUtils.check_expert_map_record_path("path/to/record.txt") - with pytest.raises(TypeError, - match="The expert_map_record_path is not json."): - EPLBParamUtils.check_expert_map_record_path("path/to/record.XML") - - def test_check_expert_map_record_path_value_error_env_not_set( - self, monkeypatch): - monkeypatch.delenv("EXPERT_MAP_RECORD", raising=False) - with pytest.raises( - ValueError, - match= - 'Can not enable expert_map_record_path when not export EXPERT_MAP_RECORD="true".' - ): - EPLBParamUtils.check_expert_map_record_path("path/to/record.json") - - monkeypatch.setenv("EXPERT_MAP_RECORD", "false") - with pytest.raises( - ValueError, - match= - 'Can not enable expert_map_record_path when not export EXPERT_MAP_RECORD="true".' - ): - EPLBParamUtils.check_expert_map_record_path("path/to/record.json") diff --git a/tests/ut/quantization/test_w4a16.py b/tests/ut/quantization/test_w4a16.py index 5d50e738..71d0298c 100644 --- a/tests/ut/quantization/test_w4a16.py +++ b/tests/ut/quantization/test_w4a16.py @@ -101,8 +101,8 @@ class TestAscendW4A16FusedMoEMethod(TestBase): @patch("vllm_ascend.quantization.w4a16.get_current_vllm_config") def setUp(self, mock_get_current_vllm_config, mock_get_ascend_config): mock_ascend_config = Mock() - mock_ascend_config.dynamic_eplb = False - mock_ascend_config.expert_map_record_path = None + mock_ascend_config.eplb_config.dynamic_eplb = False + mock_ascend_config.eplb_config.expert_map_record_path = None mock_get_ascend_config.return_value = mock_ascend_config mock_vllm_config = Mock() diff --git a/tests/ut/quantization/test_w4a8_dynamic.py b/tests/ut/quantization/test_w4a8_dynamic.py index 3ed2a877..70ce13fa 100644 --- a/tests/ut/quantization/test_w4a8_dynamic.py +++ b/tests/ut/quantization/test_w4a8_dynamic.py @@ -136,7 +136,7 @@ class TestAscendW4A8DynamicFusedMoEMethod(TestBase): get_current_vllm_config, mock_get_ascend_config): # Mock ascend config mock_ascend_config = Mock() - mock_ascend_config.dynamic_eplb = False + mock_ascend_config.eplb_config.dynamic_eplb = False mock_get_ascend_config.return_value = mock_ascend_config mock_vllm_config = Mock() diff --git a/tests/ut/test_ascend_config.py b/tests/ut/test_ascend_config.py index 5cccc027..614f5aeb 100644 --- a/tests/ut/test_ascend_config.py +++ b/tests/ut/test_ascend_config.py @@ -37,7 +37,6 @@ class TestAscendConfig(TestBase): test_vllm_config = VllmConfig() # No additional config given, check the default value here. ascend_config = init_ascend_config(test_vllm_config) - self.assertIsNone(ascend_config.expert_map_path) self.assertFalse(ascend_config.multistream_overlap_shared_expert) self.assertFalse(ascend_config.enable_kv_nz) @@ -52,12 +51,14 @@ class TestAscendConfig(TestBase): "fuse_norm_quant": False, }, "multistream_overlap_shared_expert": True, - "expert_map_path": "test_expert_map_path", + "eplb_config": { + "num_redundant_experts": 2 + }, "refresh": True, "enable_kv_nz": False } ascend_config = init_ascend_config(test_vllm_config) - self.assertEqual(ascend_config.expert_map_path, "test_expert_map_path") + self.assertEqual(ascend_config.eplb_config.num_redundant_experts, 2) self.assertTrue(ascend_config.multistream_overlap_shared_expert) self.assertFalse(ascend_config.enable_npugraph_ex) diff --git a/vllm_ascend/ascend_config.py b/vllm_ascend/ascend_config.py index 51f28b60..49a5409a 100644 --- a/vllm_ascend/ascend_config.py +++ b/vllm_ascend/ascend_config.py @@ -13,6 +13,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import os from typing import TYPE_CHECKING, Optional from vllm.logger import logger @@ -44,6 +45,9 @@ class AscendConfig: self.finegrained_tp_config = FinegrainedTPConfig( finegrained_tp_config, vllm_config) + eplb_config = additional_config.get("eplb_config", {}) + self.eplb_config = EplbConfig(eplb_config) + # Dump / PrecisionDebugger configuration self.dump_config_path = additional_config.get("dump_config_path", None) @@ -58,20 +62,6 @@ class AscendConfig: "using it without these features may result in significant performance degradation." ) - # Todo: Once https://github.com/vllm-project/vllm/issues/22246 is merged in vllm. Remove this config - self.expert_map_path = additional_config.get("expert_map_path", None) - self.eplb_policy_type = additional_config.get("eplb_policy_type", 1) - self.expert_map_record_path = additional_config.get( - "expert_map_record_path", - None) # Provide path to export expert map - self.init_redundancy_expert = additional_config.get( - "init_redundancy_expert", 0) - self.dynamic_eplb = additional_config.get("dynamic_eplb", False) - self.num_iterations_eplb_update = additional_config.get( - "num_iterations_eplb_update", 400) - self.gate_eplb = additional_config.get("gate_eplb", False) - self.num_wait_worker_iterations = additional_config.get( - "num_wait_worker_iterations", 30) self.enable_shared_expert_dp = additional_config.get( "enable_shared_expert_dp", False) and vllm_config.parallel_config.enable_expert_parallel @@ -275,6 +265,62 @@ class WeightPrefetchConfig: "prefetch_ratio", self.prefetch_ratio) +class EplbConfig: + """ + Configuration Object for xlite_graph_config from additional_config + """ + _defaults = { + "dynamic_eplb": False, + "expert_map_path": None, + "expert_heat_collection_interval": 400, + "algorithm_execution_interval": 30, + "expert_map_record_path": None, + "num_redundant_experts": 0, + "eplb_policy_type": 1 + } + + def __init__(self, user_config: dict = {}): + self.config = self._defaults.copy() + if user_config and isinstance(user_config, dict): + for key, value in user_config.items(): + if key in self.config: + self.config[key] = value + else: + raise ValueError(f"Config has no attribute '{key}'") + + self._validate_config() + + def __getattr__(self, key): + if key in self.config: + return self.config[key] + raise AttributeError(f"Config has no attribute '{key}'") + + def _validate_config(self): + if self.expert_map_path is not None: + if self.expert_map_path[-5:] != ".json": + raise TypeError("The expert_map is not json.") + if not os.path.exists(self.expert_map_path): + raise ValueError("The expert_map is not exist.") + if self.expert_map_record_path is not None: + self.config["dynamic_eplb"] = True + if self.expert_map_record_path[-5:] != ".json": + raise TypeError("The expert_map_record_path is not json.") + dirname = os.path.dirname(self.expert_map_record_path) + os.makedirs(dirname, exist_ok=True) + for key in [ + "expert_heat_collection_interval", + "algorithm_execution_interval", "num_redundant_experts" + ]: + if not isinstance(self.config[key], int): + raise TypeError(f"{key} must be an integer") + if self.config[key] < 0: # type: ignore + raise ValueError( + f"{key} must greater than 0; got {self.config[key]} instead" + ) + if self.eplb_policy_type not in [0, 1, 2, 3]: + raise ValueError("eplb_policy_type must in [0, 1, 2, 3]") + + _ASCEND_CONFIG: Optional[AscendConfig] = None diff --git a/vllm_ascend/ascend_forward_context.py b/vllm_ascend/ascend_forward_context.py index cf4af587..092544b7 100644 --- a/vllm_ascend/ascend_forward_context.py +++ b/vllm_ascend/ascend_forward_context.py @@ -242,8 +242,7 @@ def select_moe_comm_method(num_tokens: int, moe_comm_type = MoECommType.ALLGATHER elif soc_version in {AscendDeviceType.A3}: - ascend_config = get_ascend_config() - dynamic_eplb = ascend_config.dynamic_eplb or ascend_config.expert_map_record_path + dynamic_eplb = get_ascend_config().eplb_config.dynamic_eplb # TODO: drop the EP-size guard when dispatch_ffn_combine supports larger EP sizes # TODO: drop speculative method guard when dispatch_gmm_combine_decode supports w16a16 fused_mc2_enable = envs_ascend.VLLM_ASCEND_ENABLE_FUSED_MC2 and quant_type == "w8a8_dynamic" diff --git a/vllm_ascend/eplb/core/eplb_utils.py b/vllm_ascend/eplb/core/eplb_utils.py index 21a7c2e5..4faab717 100644 --- a/vllm_ascend/eplb/core/eplb_utils.py +++ b/vllm_ascend/eplb/core/eplb_utils.py @@ -17,15 +17,12 @@ # Todo: Once https://github.com/vllm-project/vllm/issues/22246 is merged in vllm. Remove eplb utils. import json import os.path -import sys from collections import defaultdict import numpy as np import torch from vllm.logger import logger -import vllm_ascend.envs as envs_ascend - def expert_file_to_tensor(expert_map_path, layer_id): with open(expert_map_path, "r") as f: @@ -56,13 +53,13 @@ def generate_global_placement(n_expert, ep_size, n_redundant): return torch.tensor(groups, dtype=torch.int32) -def init_eplb_config(ascend_config, layer_id, moe_config): - expert_map_path = ascend_config.expert_map_path +def init_eplb_config(eplb_config, layer_id, moe_config): + expert_map_path = eplb_config.expert_map_path n_experts = moe_config.num_experts ep_size = moe_config.ep_size global_placement = None - eplb_enable = ascend_config.dynamic_eplb or ascend_config.expert_map_record_path - n_redundant = ascend_config.init_redundancy_expert if eplb_enable else 0 + eplb_enable = eplb_config.dynamic_eplb + n_redundant = eplb_config.num_redundant_experts if eplb_enable else 0 if expert_map_path: if not (os.path.exists(expert_map_path) and os.access(expert_map_path, os.R_OK)): @@ -83,6 +80,7 @@ def init_eplb_config(ascend_config, layer_id, moe_config): n_redundant) if ep_size == 1: + assert not eplb_enable, "EPLB must used in expert parallelism." return None, None, n_redundant global_expert_map = [] for rankid in range(ep_size): @@ -116,73 +114,3 @@ def generate_log2phy_map(global_expert_map, ep_rank): torch.tensor(list(log2phy_map.values()), dtype=torch.int32)) return log2phy_map - - -class EPLBParamUtils: - - @staticmethod - def check_iterations(iterations): - if not isinstance(iterations, int): - raise TypeError(f"The {iterations} is not int.") - if iterations <= 0: - raise ValueError( - f"The {iterations} can not less than or equal to 0.") - if iterations > sys.maxsize: - raise ValueError( - f"The {iterations} can not large than {sys.maxsize}") - - @staticmethod - def check_dynamic_eplb(dynamic_eplb): - if dynamic_eplb is None: - return - if not isinstance(dynamic_eplb, bool): - raise TypeError("The dynamic_eplb is not bool.") - - if dynamic_eplb and envs_ascend.DYNAMIC_EPLB not in ("true", "1"): - raise ValueError( - 'Can not enable dynamic_eplb when DYNAMIC_EPLB is not set to "true" or "1".' - ) - - @staticmethod - def check_expert_map_path(expert_map): - if expert_map is None: - return - if not isinstance(expert_map, str): - raise TypeError("The expert_map is not str.") - if not expert_map.strip(): - raise ValueError("The expert_map is not empty.") - _, ext = os.path.splitext(expert_map) - if ext.lower() != ".json": - raise TypeError("The expert_map is not json.") - if not os.path.exists(expert_map): - raise ValueError("The expert_map is not exist.") - try: - with open(expert_map, "w", encoding='utf-8') as f: - f.read() - except Exception as e: - raise IOError( - f"Fail read expert info from {expert_map}, please check the reading permission of {expert_map} : {e}" - ) - - @staticmethod - def check_expert_map_record_path(expert_map_record_path): - if expert_map_record_path is None: - return - if not isinstance(expert_map_record_path, str): - raise TypeError("The expert_map_record_path is not str.") - if not expert_map_record_path.strip(): - raise ValueError("The expert_map_record_path is empty.") - _, ext = os.path.splitext(expert_map_record_path) - if ext.lower() != ".json": - raise TypeError("The expert_map_record_path is not json.") - if os.getenv("EXPERT_MAP_RECORD", "false") != "true": - raise ValueError( - 'Can not enable expert_map_record_path when not export EXPERT_MAP_RECORD="true".' - ) - try: - with open(expert_map_record_path, "w", encoding='utf-8') as f: - f.write("") - except Exception as e: - raise IOError( - f"Fail write expert info to {expert_map_record_path}, please check the writing permission of {expert_map_record_path} : {e}" - ) diff --git a/vllm_ascend/eplb/eplb_updator.py b/vllm_ascend/eplb/eplb_updator.py index a8d2953d..15468e9f 100644 --- a/vllm_ascend/eplb/eplb_updator.py +++ b/vllm_ascend/eplb/eplb_updator.py @@ -21,16 +21,15 @@ import torch.distributed as dist import vllm.envs as envs from vllm.logger import logger -from vllm_ascend.eplb.core.eplb_utils import EPLBParamUtils from vllm_ascend.eplb.core.eplb_worker import EplbProcess class EplbUpdator: - def __init__(self, ascend_config, loader, eplb_process: EplbProcess, + def __init__(self, eplb_config, loader, eplb_process: EplbProcess, process): - self.ascend_config = ascend_config - self.init_eplb(self.ascend_config.expert_map_path, process) + self.eplb_config = eplb_config + self.init_eplb(self.eplb_config.expert_map_path, process) self.eplb_loader = loader self.eplb_process = eplb_process self.shared_dict = self.eplb_process.shared_dict @@ -45,28 +44,24 @@ class EplbUpdator: self.rank_id = dist.get_rank() self.num_expert_load_gather = 10 self.periodic_load_gather = True - self.num_iterations_eplb_update: torch.int64 = self.ascend_config.num_iterations_eplb_update - EPLBParamUtils.check_iterations(self.num_iterations_eplb_update) + self.expert_heat_collection_interval: torch.int64 = self.eplb_config.expert_heat_collection_interval self.expert_map_path = expert_map_path - self.expert_map_record_path = self.ascend_config.expert_map_record_path + self.expert_map_record_path = self.eplb_config.expert_map_record_path try: if not envs.VLLM_ALLOW_EXPERT_LOAD_COLLECTING: - self.num_expert_load_gather = self.num_iterations_eplb_update + self.num_expert_load_gather = self.expert_heat_collection_interval self.periodic_load_gather = False except Exception: - self.num_expert_load_gather = self.num_iterations_eplb_update + self.num_expert_load_gather = self.expert_heat_collection_interval self.periodic_load_gather = False - self.gate_eplb = self.ascend_config.gate_eplb - self.reqs = [] self.update_info_all = [] self.cur_iterations: torch.int64 = 0 - self.num_wait_worker_iterations: torch.int64 = self.ascend_config.num_wait_worker_iterations - EPLBParamUtils.check_iterations(self.num_wait_worker_iterations) + self.algorithm_execution_interval: torch.int64 = self.eplb_config.algorithm_execution_interval self.process = process @@ -75,8 +70,8 @@ class EplbUpdator: def update_iteration(self): self.cur_iterations += 1 - if self.cur_iterations == (self.num_iterations_eplb_update + \ - self.num_wait_worker_iterations + self.num_moe_layers): + if self.cur_iterations == (self.expert_heat_collection_interval + \ + self.algorithm_execution_interval + self.num_moe_layers): logger.info("Finish expert parallel load balancing.") if self.expert_map_record_path is not None: self.adaptor._export_tensor_to_file( @@ -84,19 +79,20 @@ class EplbUpdator: self.expert_map_record_path) self.adaptor.model.clear_all_moe_loads() - if not self.gate_eplb: - self.cur_iterations = 0 + self.cur_iterations = 0 def get_update_info_flag(self): - return self.cur_iterations == (self.num_iterations_eplb_update + - self.num_wait_worker_iterations - 1) + return self.cur_iterations == (self.expert_heat_collection_interval + + self.algorithm_execution_interval - 1) def wakeup_eplb_worker_flag(self): - return self.cur_iterations == (self.num_iterations_eplb_update - 1) + return self.cur_iterations == (self.expert_heat_collection_interval - + 1) def update_expert_weight_flag(self): weight_update_counter = self.cur_iterations - ( - self.num_iterations_eplb_update + self.num_wait_worker_iterations) + self.expert_heat_collection_interval + + self.algorithm_execution_interval) return (weight_update_counter >= 0 and weight_update_counter < self.num_moe_layers) diff --git a/vllm_ascend/ops/fused_moe/fused_moe.py b/vllm_ascend/ops/fused_moe/fused_moe.py index cdf2aa6d..8d762836 100644 --- a/vllm_ascend/ops/fused_moe/fused_moe.py +++ b/vllm_ascend/ops/fused_moe/fused_moe.py @@ -55,7 +55,7 @@ class AscendUnquantizedFusedMoEMethod(UnquantizedFusedMoEMethod): def __init__(self, moe: FusedMoEConfig = None): super().__init__(moe=moe) - self.dynamic_eplb = get_ascend_config().dynamic_eplb + self.dynamic_eplb = get_ascend_config().eplb_config.dynamic_eplb def process_weights_after_loading(self, layer): super(UnquantizedFusedMoEMethod, @@ -187,14 +187,14 @@ class AscendFusedMoE(FusedMoE): dtype=vllm_config.model_config.dtype) # init moe + eplb_config = ascend_config.eplb_config self.global_expert_map, self.log2phy, self.global_redundant_expert_num = init_eplb_config( - ascend_config, self.moe_instance_id, self.moe_config) + eplb_config, self.moe_instance_id, self.moe_config) if self.global_expert_map is not None: self._expert_map = self.global_expert_map[self.ep_rank].npu() self.global_num_experts = num_experts + self.global_redundant_expert_num - self.dynamic_eplb = (ascend_config.dynamic_eplb - or ascend_config.expert_map_record_path) and ( - self.log2phy is not None) + self.dynamic_eplb = eplb_config.dynamic_eplb and (self.log2phy + is not None) self.local_num_experts = (torch.sum( self._expert_map != -1).item() if self._expert_map is not None else self.global_num_experts) diff --git a/vllm_ascend/quantization/w4a16.py b/vllm_ascend/quantization/w4a16.py index c6eb379d..3767593f 100644 --- a/vllm_ascend/quantization/w4a16.py +++ b/vllm_ascend/quantization/w4a16.py @@ -116,8 +116,7 @@ class AscendW4A16FusedMoEMethod: vllm_config = get_current_vllm_config() self.group_size = vllm_config.quant_config.quant_description.get( "group_size", 32) - ascend_config = get_ascend_config() - self.dynamic_eplb = ascend_config.dynamic_eplb or ascend_config.expert_map_record_path + self.dynamic_eplb = get_ascend_config().eplb_config.dynamic_eplb def get_weight( self, diff --git a/vllm_ascend/quantization/w4a8_dynamic.py b/vllm_ascend/quantization/w4a8_dynamic.py index 167a42fc..21a43028 100644 --- a/vllm_ascend/quantization/w4a8_dynamic.py +++ b/vllm_ascend/quantization/w4a8_dynamic.py @@ -224,8 +224,7 @@ class AscendW4A8DynamicFusedMoEMethod: # NOTE: new quantize weights: 2 int4 pack into int8 self.new_quant_version = quant_version == "1.0.0" self.tp_size = 1 if vllm_config.parallel_config.enable_expert_parallel else self.ep_group.world_size - ascend_config = get_ascend_config() - self.dynamic_eplb = ascend_config.dynamic_eplb or ascend_config.expert_map_record_path + self.dynamic_eplb = get_ascend_config().eplb_config.dynamic_eplb if self.new_quant_version and self.tp_size > 16: raise ValueError( "The current weight does not support moe part tp>16.") diff --git a/vllm_ascend/quantization/w8a8_dynamic.py b/vllm_ascend/quantization/w8a8_dynamic.py index b2e92e6e..103b6c10 100644 --- a/vllm_ascend/quantization/w8a8_dynamic.py +++ b/vllm_ascend/quantization/w8a8_dynamic.py @@ -114,7 +114,7 @@ class AscendW8A8DynamicFusedMoEMethod: and not vllm_config.model_config.enforce_eager) self.multistream_overlap_gate = ascend_config.multistream_overlap_gate - self.dynamic_eplb = ascend_config.dynamic_eplb or ascend_config.expert_map_record_path + self.dynamic_eplb = ascend_config.eplb_config.dynamic_eplb self.in_dtype = vllm_config.model_config.dtype self.supports_eplb = True diff --git a/vllm_ascend/worker/model_runner_v1.py b/vllm_ascend/worker/model_runner_v1.py index 636cbfdd..b7497a8e 100644 --- a/vllm_ascend/worker/model_runner_v1.py +++ b/vllm_ascend/worker/model_runner_v1.py @@ -91,7 +91,6 @@ from vllm_ascend.compilation.acl_graph import (ACLGraphWrapper, from vllm_ascend.eplb.adaptor.vllm_adaptor import VllmEplbAdaptor from vllm_ascend.eplb.core.eplb_device_transfer_loader import \ D2DExpertWeightLoader -from vllm_ascend.eplb.core.eplb_utils import EPLBParamUtils from vllm_ascend.eplb.core.eplb_worker import EplbProcess from vllm_ascend.eplb.eplb_updator import EplbUpdator from vllm_ascend.eplb.utils import model_register @@ -290,13 +289,11 @@ class NPUModelRunner(GPUModelRunner): self.use_aclgraph = self._use_aclgraph() - self.dynamic_eplb = self.ascend_config.dynamic_eplb or self.ascend_config.expert_map_record_path + eplb_config = self.ascend_config.eplb_config + self.dynamic_eplb = eplb_config.dynamic_eplb if self.dynamic_eplb: - EPLBParamUtils.check_dynamic_eplb(self.ascend_config.dynamic_eplb) - EPLBParamUtils.check_expert_map_record_path( - self.ascend_config.expert_map_record_path) self.is_eplb_warmuped = False - self.policy_type = self.ascend_config.eplb_policy_type + self.policy_type = eplb_config.eplb_policy_type self.eplb_loader = D2DExpertWeightLoader() self.manager = Manager() self.shared_dict = self.manager.dict({ @@ -308,8 +305,7 @@ class NPUModelRunner(GPUModelRunner): policy_type=self.policy_type, enable_d2d=True) self.process = self.eplb_process._launch_process() - ascend_config = get_ascend_config() - self.eplb_updator = EplbUpdator(ascend_config, self.eplb_loader, + self.eplb_updator = EplbUpdator(eplb_config, self.eplb_loader, self.eplb_process, self.process) # Input Batch # NOTE(Chen): Ideally, we should initialize the input batch inside