diff --git a/tests/ut/eplb/core/test_eplb_device_transfer_loader.py b/tests/ut/eplb/core/test_eplb_device_transfer_loader.py index 48777894..6c6ff263 100644 --- a/tests/ut/eplb/core/test_eplb_device_transfer_loader.py +++ b/tests/ut/eplb/core/test_eplb_device_transfer_loader.py @@ -107,10 +107,3 @@ def test_invalid_state_asyn_update(mock_adaptor): loader_obj.update_expert_map_and_weight([]) assert not mock_adaptor.do_update_expert_map.called - - -def test_load_impl_not_implemented(mock_adaptor): - loader_obj = loader.D2DExpertWeightLoader() - loader_obj.set_adator(mock_adaptor) - with pytest.raises(NotImplementedError): - loader_obj.load_impl({}, {}) diff --git a/vllm_ascend/eplb/core/eplb_device_transfer_loader.py b/vllm_ascend/eplb/core/eplb_device_transfer_loader.py index a5dc6154..728a61f2 100644 --- a/vllm_ascend/eplb/core/eplb_device_transfer_loader.py +++ b/vllm_ascend/eplb/core/eplb_device_transfer_loader.py @@ -34,7 +34,7 @@ class D2DExpertWeightLoader: self.layer_id = -1 # layer id to be updated self.state = ExpertWeightUpdateState.WAITING self.recv_expert_list = [] - self.mock_flag = True + self.num_layers = 0 def set_adator(self, eplb_adaptor): self.eplb_adaptor = eplb_adaptor @@ -103,12 +103,10 @@ class D2DExpertWeightLoader: local_expert_to_replace, buffer_tensor_id = recv_expert_info self.eplb_adaptor.do_update_expert_weight(self.layer_id, local_expert_to_replace, buffer_tensor_id) - logger.debug(f"[EPLB] finished update expert weight for layer: {self.layer_id}") + if self.layer_id == self.num_layers - 1: + logger.info("[EPLB] finished update expert weight.") self.recv_expert_list = [] self.updated_expert_map = None self.layer_id = -1 self.state = ExpertWeightUpdateState.WAITING - - def load_impl(self, old_expert_table, new_expert_table): - raise NotImplementedError diff --git a/vllm_ascend/eplb/core/eplb_worker.py b/vllm_ascend/eplb/core/eplb_worker.py index 469c84fd..6374601c 100644 --- a/vllm_ascend/eplb/core/eplb_worker.py +++ b/vllm_ascend/eplb/core/eplb_worker.py @@ -68,7 +68,7 @@ class EplbWorker: update_info = self.compose_expert_update_info_greedy(new_expert_maps, self.old_expert_maps) self.old_expert_maps = new_expert_maps - logger.info("EPLB Process compute complete") + logger.debug("EPLB Process compute complete") packed_update_info = self.pack_update_info(update_info) @@ -274,6 +274,10 @@ class EplbProcess: Subprocess entry: bind to specified NPU, loop waiting for planner_q to wake up, call do_update, then notify main process update is complete. """ + if self.policy_type == 3: + from vllm_ascend.eplb.core.policy.policy_flashlb import warm_up + + warm_up() while True: try: planner_q.get() diff --git a/vllm_ascend/eplb/eplb_updator.py b/vllm_ascend/eplb/eplb_updator.py index b9cd66a8..536786c2 100644 --- a/vllm_ascend/eplb/eplb_updator.py +++ b/vllm_ascend/eplb/eplb_updator.py @@ -22,11 +22,12 @@ import vllm.envs as envs from vllm.logger import logger from vllm_ascend.eplb.adaptor.vllm_adaptor import VllmEplbAdaptor +from vllm_ascend.eplb.core.eplb_device_transfer_loader import D2DExpertWeightLoader from vllm_ascend.eplb.core.eplb_worker import EplbProcess class EplbUpdator: - def __init__(self, eplb_config, loader, eplb_process: EplbProcess, process): + def __init__(self, eplb_config, loader: D2DExpertWeightLoader, eplb_process: EplbProcess, process): self.eplb_config = eplb_config self.init_eplb(self.eplb_config.expert_map_path, process) self.eplb_loader = loader @@ -42,6 +43,7 @@ class EplbUpdator: self.device = local_load.device shape = (self.world_size, *local_load.shape) self._gather_buffer = torch.empty(shape, dtype=local_load.dtype, device=self.device) + self.eplb_loader.num_layers = self.adaptor.num_dense_layers + self.adaptor.num_moe_layers def init_eplb(self, expert_map_path, process): self.rank_id = dist.get_rank() @@ -75,7 +77,6 @@ class EplbUpdator: if self.cur_iterations == ( self.expert_heat_collection_interval + self.algorithm_execution_interval + self.num_moe_layers ): - logger.info("Finish expert parallel load balancing.") if self.expert_map_record_path is not None: self.adaptor._export_tensor_to_file(self.shared_dict["expert_maps"], self.expert_map_record_path) diff --git a/vllm_ascend/worker/model_runner_v1.py b/vllm_ascend/worker/model_runner_v1.py index b1d925a5..feccb8a7 100644 --- a/vllm_ascend/worker/model_runner_v1.py +++ b/vllm_ascend/worker/model_runner_v1.py @@ -1099,6 +1099,10 @@ class NPUModelRunner(GPUModelRunner): "logprobs for prompt tokens, tokens, please disable " "it when the requests need prompt logprobs" ) + + if self.dynamic_eplb: + self.eplb_updator.forward_before() + num_reqs = self.input_batch.num_reqs req_ids = self.input_batch.req_ids tokens = [scheduler_output.num_scheduled_tokens[i] for i in req_ids] @@ -1198,6 +1202,9 @@ class NPUModelRunner(GPUModelRunner): ec_connector_output, ) = self._preprocess(scheduler_output, num_tokens_padded, intermediate_tensors) + if self.dynamic_eplb: + self.eplb_updator.take_update_info_from_eplb_process() + # update global cos, sin update_cos_sin(positions) @@ -2072,6 +2079,9 @@ class NPUModelRunner(GPUModelRunner): assert sum(num_scheduled_tokens_list) == num_tokens assert len(num_scheduled_tokens_list) == num_reqs + if not is_profile and self.dynamic_eplb: + self.eplb_updator.forward_before() + num_scheduled_tokens = np.array(num_scheduled_tokens_list, dtype=np.int32) self.query_lens = torch.from_numpy(num_scheduled_tokens) num_tokens_unpadded = int(num_scheduled_tokens.sum())