[BugFix] Check all expert maps when using muilty instance. (#3662)
### What this PR does / why we need it? Check all expert maps when using muilty instance. ### Does this PR introduce _any_ user-facing change? None. ### How was this patch tested? Qwen 235B in double A3. case1:master has expert map, slave has not expert map. case2: master has expert map, slave has error expert map. case3: master has expert map,slave has correct expert map. - vLLM version: v0.11.0rc3 - vLLM main: https://github.com/vllm-project/vllm/commit/v0.11.0 --------- Signed-off-by: offline0806 <3337230449@qq.com> Co-authored-by: offline0806 <3337230449@qq.com>
This commit is contained in:
@@ -192,6 +192,7 @@ class AscendFusedMoE(FusedMoE):
|
|||||||
os.R_OK):
|
os.R_OK):
|
||||||
self.expert_load_balancer = ExpertLoadBalancer(
|
self.expert_load_balancer = ExpertLoadBalancer(
|
||||||
self.expert_map_path, self.global_num_experts)
|
self.expert_map_path, self.global_num_experts)
|
||||||
|
self.expert_load_balancer.check_expert_map_tensor()
|
||||||
self.global_redundant_expert_num = (
|
self.global_redundant_expert_num = (
|
||||||
self.expert_load_balancer.get_global_redundant_expert_num())
|
self.expert_load_balancer.get_global_redundant_expert_num())
|
||||||
try:
|
try:
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ import random
|
|||||||
from typing import Dict, List
|
from typing import Dict, List
|
||||||
|
|
||||||
import torch
|
import torch
|
||||||
|
import torch.distributed as dist
|
||||||
|
|
||||||
|
|
||||||
class ExpertLoadBalancer(object):
|
class ExpertLoadBalancer(object):
|
||||||
@@ -10,8 +11,10 @@ class ExpertLoadBalancer(object):
|
|||||||
def __init__(self, expert_map_path, global_expert_num):
|
def __init__(self, expert_map_path, global_expert_num):
|
||||||
self.expert_map_path = expert_map_path
|
self.expert_map_path = expert_map_path
|
||||||
self.global_expert_num = global_expert_num
|
self.global_expert_num = global_expert_num
|
||||||
|
self.tensor_data = []
|
||||||
self.expert_map_tensor, self.layers_num, self.ranks_num = (
|
self.expert_map_tensor, self.layers_num, self.ranks_num = (
|
||||||
self._expert_file_to_tensor())
|
self._expert_file_to_tensor())
|
||||||
|
self.expert_placement_map = self.generate_expert_placement_map()
|
||||||
|
|
||||||
def _expert_file_to_tensor(self):
|
def _expert_file_to_tensor(self):
|
||||||
with open(self.expert_map_path, "r") as f:
|
with open(self.expert_map_path, "r") as f:
|
||||||
@@ -19,13 +22,12 @@ class ExpertLoadBalancer(object):
|
|||||||
layers_num = data["moe_layer_count"]
|
layers_num = data["moe_layer_count"]
|
||||||
gpus_num = data["layer_list"][0]["device_count"]
|
gpus_num = data["layer_list"][0]["device_count"]
|
||||||
|
|
||||||
tensor_data = []
|
|
||||||
for layer in data["layer_list"]:
|
for layer in data["layer_list"]:
|
||||||
device_data = []
|
device_data = []
|
||||||
for device in layer["device_list"]:
|
for device in layer["device_list"]:
|
||||||
device_data.append(device["device_expert"])
|
device_data.append(device["device_expert"])
|
||||||
tensor_data.append(device_data)
|
self.tensor_data.append(device_data)
|
||||||
expert_map_tensor = torch.tensor(tensor_data, dtype=torch.int32)
|
expert_map_tensor = torch.tensor(self.tensor_data, dtype=torch.int32)
|
||||||
return expert_map_tensor, layers_num, gpus_num
|
return expert_map_tensor, layers_num, gpus_num
|
||||||
|
|
||||||
def generate_index_dicts(self, tensor_2d):
|
def generate_index_dicts(self, tensor_2d):
|
||||||
@@ -81,8 +83,7 @@ class ExpertLoadBalancer(object):
|
|||||||
return log2phy_map
|
return log2phy_map
|
||||||
|
|
||||||
def get_rank_placement_map(self, layer_id, rank_id):
|
def get_rank_placement_map(self, layer_id, rank_id):
|
||||||
expert_placement_map = self.generate_expert_placement_map()
|
layer_expert_map = self.expert_placement_map[layer_id]
|
||||||
layer_expert_map = expert_placement_map[layer_id]
|
|
||||||
rank_expert_map = layer_expert_map[rank_id].to(
|
rank_expert_map = layer_expert_map[rank_id].to(
|
||||||
torch.npu.current_device())
|
torch.npu.current_device())
|
||||||
rank_local_expert_num = torch.sum(torch.ne(rank_expert_map, -1)).item()
|
rank_local_expert_num = torch.sum(torch.ne(rank_expert_map, -1)).item()
|
||||||
@@ -97,3 +98,20 @@ class ExpertLoadBalancer(object):
|
|||||||
len(self.expert_map_tensor[0][0]) * self.ranks_num -
|
len(self.expert_map_tensor[0][0]) * self.ranks_num -
|
||||||
self.global_expert_num)
|
self.global_expert_num)
|
||||||
return global_redundant_expert_num
|
return global_redundant_expert_num
|
||||||
|
|
||||||
|
def check_expert_map_tensor(self):
|
||||||
|
if dist.is_initialized():
|
||||||
|
try:
|
||||||
|
rank = dist.get_rank()
|
||||||
|
world_size = dist.get_world_size()
|
||||||
|
all_expert_maps = [None for _ in range(world_size)]
|
||||||
|
dist.all_gather_object(all_expert_maps, self.tensor_data)
|
||||||
|
for rank_id, expert_map_tensor in enumerate(all_expert_maps):
|
||||||
|
if self.tensor_data != expert_map_tensor:
|
||||||
|
raise ValueError(
|
||||||
|
f"The expert map of rank{rank} is not equal to rank{rank_id}"
|
||||||
|
)
|
||||||
|
return True
|
||||||
|
except Exception as e:
|
||||||
|
raise ValueError(
|
||||||
|
f"The expert maps of all ranks are inconsistency: {e}")
|
||||||
|
|||||||
@@ -1042,6 +1042,7 @@ class TorchairAscendFusedMoE(FusedMoE):
|
|||||||
os.R_OK):
|
os.R_OK):
|
||||||
self.expert_load_balancer = ExpertLoadBalancer(
|
self.expert_load_balancer = ExpertLoadBalancer(
|
||||||
self.expert_map_path, self.global_num_experts)
|
self.expert_map_path, self.global_num_experts)
|
||||||
|
self.expert_load_balancer.check_expert_map_tensor()
|
||||||
self.global_redundant_expert_num = (
|
self.global_redundant_expert_num = (
|
||||||
self.expert_load_balancer.get_global_redundant_expert_num())
|
self.expert_load_balancer.get_global_redundant_expert_num())
|
||||||
try:
|
try:
|
||||||
|
|||||||
Reference in New Issue
Block a user