[EPLB]Eplb Config Renaming (#5533)

### What this PR does / why we need it?
1. Rename num_iterations_eplb_update to expert_heat_collection_interval.
2. Rename num_wait_worker_iterations to algorithm_execution_interval.
3. Rename init_redundancy_expert to num_redundant_experts because the
variable with the same meaning in vLLM is named this way.
4. Delete gate_eplb because we don't need this feature.
5. Move eplb config into a dict in additional config.
6. Depend on pr5817

### Does this PR introduce _any_ user-facing change?

before this pr:
`--additional-config '{"dynamic_eplb":true,
"num_iterations_eplb_update": 4000, "num_wait_worker_iterations": 150,
"init_redundancy_expert": 16, "expert_map_path": "xxx.json"}'`

after this pr: 
`--additional-config
'{"eplb_config":{"dynamic_eplb":true,"expert_heat_collection_interval":4000,
"algorithm_execution_interval":150,"num_redundant_experts": 16,
"expert_map_path": "xxx.json"}}'`

### How was this patch tested?

#### test qwen3-235b eplb num_redundant_experts=16

without pr5817
| dataset | version | metric | mode | vllm-api-general-chat |
|----- | ----- | ----- | ----- | -----|
| aime2024 | 604a78 | accuracy | gen | 83.33 |

with pr5817
| dataset | version | metric | mode | vllm-api-general-chat |
|----- | ----- | ----- | ----- | -----|
| aime2024 | 604a78 | accuracy | gen | 86.67 |

- vLLM version: v0.13.0
- vLLM main:
45c1ca1ca1

Signed-off-by: shenchuxiaofugui <1311027364@qq.com>
This commit is contained in:
LI SHENGYONG
2026-01-15 10:26:44 +08:00
committed by GitHub
parent ea01aeaab7
commit da958ee386
21 changed files with 174 additions and 349 deletions

View File

@@ -13,6 +13,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from typing import TYPE_CHECKING, Optional
from vllm.logger import logger
@@ -44,6 +45,9 @@ class AscendConfig:
self.finegrained_tp_config = FinegrainedTPConfig(
finegrained_tp_config, vllm_config)
eplb_config = additional_config.get("eplb_config", {})
self.eplb_config = EplbConfig(eplb_config)
# Dump / PrecisionDebugger configuration
self.dump_config_path = additional_config.get("dump_config_path", None)
@@ -58,20 +62,6 @@ class AscendConfig:
"using it without these features may result in significant performance degradation."
)
# Todo: Once https://github.com/vllm-project/vllm/issues/22246 is merged in vllm. Remove this config
self.expert_map_path = additional_config.get("expert_map_path", None)
self.eplb_policy_type = additional_config.get("eplb_policy_type", 1)
self.expert_map_record_path = additional_config.get(
"expert_map_record_path",
None) # Provide path to export expert map
self.init_redundancy_expert = additional_config.get(
"init_redundancy_expert", 0)
self.dynamic_eplb = additional_config.get("dynamic_eplb", False)
self.num_iterations_eplb_update = additional_config.get(
"num_iterations_eplb_update", 400)
self.gate_eplb = additional_config.get("gate_eplb", False)
self.num_wait_worker_iterations = additional_config.get(
"num_wait_worker_iterations", 30)
self.enable_shared_expert_dp = additional_config.get(
"enable_shared_expert_dp",
False) and vllm_config.parallel_config.enable_expert_parallel
@@ -275,6 +265,62 @@ class WeightPrefetchConfig:
"prefetch_ratio", self.prefetch_ratio)
class EplbConfig:
"""
Configuration Object for xlite_graph_config from additional_config
"""
_defaults = {
"dynamic_eplb": False,
"expert_map_path": None,
"expert_heat_collection_interval": 400,
"algorithm_execution_interval": 30,
"expert_map_record_path": None,
"num_redundant_experts": 0,
"eplb_policy_type": 1
}
def __init__(self, user_config: dict = {}):
self.config = self._defaults.copy()
if user_config and isinstance(user_config, dict):
for key, value in user_config.items():
if key in self.config:
self.config[key] = value
else:
raise ValueError(f"Config has no attribute '{key}'")
self._validate_config()
def __getattr__(self, key):
if key in self.config:
return self.config[key]
raise AttributeError(f"Config has no attribute '{key}'")
def _validate_config(self):
if self.expert_map_path is not None:
if self.expert_map_path[-5:] != ".json":
raise TypeError("The expert_map is not json.")
if not os.path.exists(self.expert_map_path):
raise ValueError("The expert_map is not exist.")
if self.expert_map_record_path is not None:
self.config["dynamic_eplb"] = True
if self.expert_map_record_path[-5:] != ".json":
raise TypeError("The expert_map_record_path is not json.")
dirname = os.path.dirname(self.expert_map_record_path)
os.makedirs(dirname, exist_ok=True)
for key in [
"expert_heat_collection_interval",
"algorithm_execution_interval", "num_redundant_experts"
]:
if not isinstance(self.config[key], int):
raise TypeError(f"{key} must be an integer")
if self.config[key] < 0: # type: ignore
raise ValueError(
f"{key} must greater than 0; got {self.config[key]} instead"
)
if self.eplb_policy_type not in [0, 1, 2, 3]:
raise ValueError("eplb_policy_type must in [0, 1, 2, 3]")
_ASCEND_CONFIG: Optional[AscendConfig] = None

View File

@@ -242,8 +242,7 @@ def select_moe_comm_method(num_tokens: int,
moe_comm_type = MoECommType.ALLGATHER
elif soc_version in {AscendDeviceType.A3}:
ascend_config = get_ascend_config()
dynamic_eplb = ascend_config.dynamic_eplb or ascend_config.expert_map_record_path
dynamic_eplb = get_ascend_config().eplb_config.dynamic_eplb
# TODO: drop the EP-size guard when dispatch_ffn_combine supports larger EP sizes
# TODO: drop speculative method guard when dispatch_gmm_combine_decode supports w16a16
fused_mc2_enable = envs_ascend.VLLM_ASCEND_ENABLE_FUSED_MC2 and quant_type == "w8a8_dynamic"

View File

@@ -17,15 +17,12 @@
# Todo: Once https://github.com/vllm-project/vllm/issues/22246 is merged in vllm. Remove eplb utils.
import json
import os.path
import sys
from collections import defaultdict
import numpy as np
import torch
from vllm.logger import logger
import vllm_ascend.envs as envs_ascend
def expert_file_to_tensor(expert_map_path, layer_id):
with open(expert_map_path, "r") as f:
@@ -56,13 +53,13 @@ def generate_global_placement(n_expert, ep_size, n_redundant):
return torch.tensor(groups, dtype=torch.int32)
def init_eplb_config(ascend_config, layer_id, moe_config):
expert_map_path = ascend_config.expert_map_path
def init_eplb_config(eplb_config, layer_id, moe_config):
expert_map_path = eplb_config.expert_map_path
n_experts = moe_config.num_experts
ep_size = moe_config.ep_size
global_placement = None
eplb_enable = ascend_config.dynamic_eplb or ascend_config.expert_map_record_path
n_redundant = ascend_config.init_redundancy_expert if eplb_enable else 0
eplb_enable = eplb_config.dynamic_eplb
n_redundant = eplb_config.num_redundant_experts if eplb_enable else 0
if expert_map_path:
if not (os.path.exists(expert_map_path)
and os.access(expert_map_path, os.R_OK)):
@@ -83,6 +80,7 @@ def init_eplb_config(ascend_config, layer_id, moe_config):
n_redundant)
if ep_size == 1:
assert not eplb_enable, "EPLB must used in expert parallelism."
return None, None, n_redundant
global_expert_map = []
for rankid in range(ep_size):
@@ -116,73 +114,3 @@ def generate_log2phy_map(global_expert_map, ep_rank):
torch.tensor(list(log2phy_map.values()), dtype=torch.int32))
return log2phy_map
class EPLBParamUtils:
@staticmethod
def check_iterations(iterations):
if not isinstance(iterations, int):
raise TypeError(f"The {iterations} is not int.")
if iterations <= 0:
raise ValueError(
f"The {iterations} can not less than or equal to 0.")
if iterations > sys.maxsize:
raise ValueError(
f"The {iterations} can not large than {sys.maxsize}")
@staticmethod
def check_dynamic_eplb(dynamic_eplb):
if dynamic_eplb is None:
return
if not isinstance(dynamic_eplb, bool):
raise TypeError("The dynamic_eplb is not bool.")
if dynamic_eplb and envs_ascend.DYNAMIC_EPLB not in ("true", "1"):
raise ValueError(
'Can not enable dynamic_eplb when DYNAMIC_EPLB is not set to "true" or "1".'
)
@staticmethod
def check_expert_map_path(expert_map):
if expert_map is None:
return
if not isinstance(expert_map, str):
raise TypeError("The expert_map is not str.")
if not expert_map.strip():
raise ValueError("The expert_map is not empty.")
_, ext = os.path.splitext(expert_map)
if ext.lower() != ".json":
raise TypeError("The expert_map is not json.")
if not os.path.exists(expert_map):
raise ValueError("The expert_map is not exist.")
try:
with open(expert_map, "w", encoding='utf-8') as f:
f.read()
except Exception as e:
raise IOError(
f"Fail read expert info from {expert_map}, please check the reading permission of {expert_map} : {e}"
)
@staticmethod
def check_expert_map_record_path(expert_map_record_path):
if expert_map_record_path is None:
return
if not isinstance(expert_map_record_path, str):
raise TypeError("The expert_map_record_path is not str.")
if not expert_map_record_path.strip():
raise ValueError("The expert_map_record_path is empty.")
_, ext = os.path.splitext(expert_map_record_path)
if ext.lower() != ".json":
raise TypeError("The expert_map_record_path is not json.")
if os.getenv("EXPERT_MAP_RECORD", "false") != "true":
raise ValueError(
'Can not enable expert_map_record_path when not export EXPERT_MAP_RECORD="true".'
)
try:
with open(expert_map_record_path, "w", encoding='utf-8') as f:
f.write("")
except Exception as e:
raise IOError(
f"Fail write expert info to {expert_map_record_path}, please check the writing permission of {expert_map_record_path} : {e}"
)

View File

@@ -21,16 +21,15 @@ import torch.distributed as dist
import vllm.envs as envs
from vllm.logger import logger
from vllm_ascend.eplb.core.eplb_utils import EPLBParamUtils
from vllm_ascend.eplb.core.eplb_worker import EplbProcess
class EplbUpdator:
def __init__(self, ascend_config, loader, eplb_process: EplbProcess,
def __init__(self, eplb_config, loader, eplb_process: EplbProcess,
process):
self.ascend_config = ascend_config
self.init_eplb(self.ascend_config.expert_map_path, process)
self.eplb_config = eplb_config
self.init_eplb(self.eplb_config.expert_map_path, process)
self.eplb_loader = loader
self.eplb_process = eplb_process
self.shared_dict = self.eplb_process.shared_dict
@@ -45,28 +44,24 @@ class EplbUpdator:
self.rank_id = dist.get_rank()
self.num_expert_load_gather = 10
self.periodic_load_gather = True
self.num_iterations_eplb_update: torch.int64 = self.ascend_config.num_iterations_eplb_update
EPLBParamUtils.check_iterations(self.num_iterations_eplb_update)
self.expert_heat_collection_interval: torch.int64 = self.eplb_config.expert_heat_collection_interval
self.expert_map_path = expert_map_path
self.expert_map_record_path = self.ascend_config.expert_map_record_path
self.expert_map_record_path = self.eplb_config.expert_map_record_path
try:
if not envs.VLLM_ALLOW_EXPERT_LOAD_COLLECTING:
self.num_expert_load_gather = self.num_iterations_eplb_update
self.num_expert_load_gather = self.expert_heat_collection_interval
self.periodic_load_gather = False
except Exception:
self.num_expert_load_gather = self.num_iterations_eplb_update
self.num_expert_load_gather = self.expert_heat_collection_interval
self.periodic_load_gather = False
self.gate_eplb = self.ascend_config.gate_eplb
self.reqs = []
self.update_info_all = []
self.cur_iterations: torch.int64 = 0
self.num_wait_worker_iterations: torch.int64 = self.ascend_config.num_wait_worker_iterations
EPLBParamUtils.check_iterations(self.num_wait_worker_iterations)
self.algorithm_execution_interval: torch.int64 = self.eplb_config.algorithm_execution_interval
self.process = process
@@ -75,8 +70,8 @@ class EplbUpdator:
def update_iteration(self):
self.cur_iterations += 1
if self.cur_iterations == (self.num_iterations_eplb_update + \
self.num_wait_worker_iterations + self.num_moe_layers):
if self.cur_iterations == (self.expert_heat_collection_interval + \
self.algorithm_execution_interval + self.num_moe_layers):
logger.info("Finish expert parallel load balancing.")
if self.expert_map_record_path is not None:
self.adaptor._export_tensor_to_file(
@@ -84,19 +79,20 @@ class EplbUpdator:
self.expert_map_record_path)
self.adaptor.model.clear_all_moe_loads()
if not self.gate_eplb:
self.cur_iterations = 0
self.cur_iterations = 0
def get_update_info_flag(self):
return self.cur_iterations == (self.num_iterations_eplb_update +
self.num_wait_worker_iterations - 1)
return self.cur_iterations == (self.expert_heat_collection_interval +
self.algorithm_execution_interval - 1)
def wakeup_eplb_worker_flag(self):
return self.cur_iterations == (self.num_iterations_eplb_update - 1)
return self.cur_iterations == (self.expert_heat_collection_interval -
1)
def update_expert_weight_flag(self):
weight_update_counter = self.cur_iterations - (
self.num_iterations_eplb_update + self.num_wait_worker_iterations)
self.expert_heat_collection_interval +
self.algorithm_execution_interval)
return (weight_update_counter >= 0
and weight_update_counter < self.num_moe_layers)

View File

@@ -55,7 +55,7 @@ class AscendUnquantizedFusedMoEMethod(UnquantizedFusedMoEMethod):
def __init__(self, moe: FusedMoEConfig = None):
super().__init__(moe=moe)
self.dynamic_eplb = get_ascend_config().dynamic_eplb
self.dynamic_eplb = get_ascend_config().eplb_config.dynamic_eplb
def process_weights_after_loading(self, layer):
super(UnquantizedFusedMoEMethod,
@@ -187,14 +187,14 @@ class AscendFusedMoE(FusedMoE):
dtype=vllm_config.model_config.dtype)
# init moe
eplb_config = ascend_config.eplb_config
self.global_expert_map, self.log2phy, self.global_redundant_expert_num = init_eplb_config(
ascend_config, self.moe_instance_id, self.moe_config)
eplb_config, self.moe_instance_id, self.moe_config)
if self.global_expert_map is not None:
self._expert_map = self.global_expert_map[self.ep_rank].npu()
self.global_num_experts = num_experts + self.global_redundant_expert_num
self.dynamic_eplb = (ascend_config.dynamic_eplb
or ascend_config.expert_map_record_path) and (
self.log2phy is not None)
self.dynamic_eplb = eplb_config.dynamic_eplb and (self.log2phy
is not None)
self.local_num_experts = (torch.sum(
self._expert_map != -1).item() if self._expert_map is not None else
self.global_num_experts)

View File

@@ -116,8 +116,7 @@ class AscendW4A16FusedMoEMethod:
vllm_config = get_current_vllm_config()
self.group_size = vllm_config.quant_config.quant_description.get(
"group_size", 32)
ascend_config = get_ascend_config()
self.dynamic_eplb = ascend_config.dynamic_eplb or ascend_config.expert_map_record_path
self.dynamic_eplb = get_ascend_config().eplb_config.dynamic_eplb
def get_weight(
self,

View File

@@ -224,8 +224,7 @@ class AscendW4A8DynamicFusedMoEMethod:
# NOTE: new quantize weights: 2 int4 pack into int8
self.new_quant_version = quant_version == "1.0.0"
self.tp_size = 1 if vllm_config.parallel_config.enable_expert_parallel else self.ep_group.world_size
ascend_config = get_ascend_config()
self.dynamic_eplb = ascend_config.dynamic_eplb or ascend_config.expert_map_record_path
self.dynamic_eplb = get_ascend_config().eplb_config.dynamic_eplb
if self.new_quant_version and self.tp_size > 16:
raise ValueError(
"The current weight does not support moe part tp>16.")

View File

@@ -114,7 +114,7 @@ class AscendW8A8DynamicFusedMoEMethod:
and not vllm_config.model_config.enforce_eager)
self.multistream_overlap_gate = ascend_config.multistream_overlap_gate
self.dynamic_eplb = ascend_config.dynamic_eplb or ascend_config.expert_map_record_path
self.dynamic_eplb = ascend_config.eplb_config.dynamic_eplb
self.in_dtype = vllm_config.model_config.dtype
self.supports_eplb = True

View File

@@ -91,7 +91,6 @@ from vllm_ascend.compilation.acl_graph import (ACLGraphWrapper,
from vllm_ascend.eplb.adaptor.vllm_adaptor import VllmEplbAdaptor
from vllm_ascend.eplb.core.eplb_device_transfer_loader import \
D2DExpertWeightLoader
from vllm_ascend.eplb.core.eplb_utils import EPLBParamUtils
from vllm_ascend.eplb.core.eplb_worker import EplbProcess
from vllm_ascend.eplb.eplb_updator import EplbUpdator
from vllm_ascend.eplb.utils import model_register
@@ -290,13 +289,11 @@ class NPUModelRunner(GPUModelRunner):
self.use_aclgraph = self._use_aclgraph()
self.dynamic_eplb = self.ascend_config.dynamic_eplb or self.ascend_config.expert_map_record_path
eplb_config = self.ascend_config.eplb_config
self.dynamic_eplb = eplb_config.dynamic_eplb
if self.dynamic_eplb:
EPLBParamUtils.check_dynamic_eplb(self.ascend_config.dynamic_eplb)
EPLBParamUtils.check_expert_map_record_path(
self.ascend_config.expert_map_record_path)
self.is_eplb_warmuped = False
self.policy_type = self.ascend_config.eplb_policy_type
self.policy_type = eplb_config.eplb_policy_type
self.eplb_loader = D2DExpertWeightLoader()
self.manager = Manager()
self.shared_dict = self.manager.dict({
@@ -308,8 +305,7 @@ class NPUModelRunner(GPUModelRunner):
policy_type=self.policy_type,
enable_d2d=True)
self.process = self.eplb_process._launch_process()
ascend_config = get_ascend_config()
self.eplb_updator = EplbUpdator(ascend_config, self.eplb_loader,
self.eplb_updator = EplbUpdator(eplb_config, self.eplb_loader,
self.eplb_process, self.process)
# Input Batch
# NOTE(Chen): Ideally, we should initialize the input batch inside