import time from vllm.executor.executor_base import logger, ExecutorBase original_init = ExecutorBase.__init__ def init(self, *args, **kwargs): original_init(self, *args, **kwargs) self.is_offloaded = False def offload_vram(self): if self.is_offloaded: logger.warning("Executor is already offloaded.") return time_before_offload = time.perf_counter() self.collective_rpc("offload_vram") time_after_offload = time.perf_counter() self.is_offloaded = True logger.info(f"Offloading VRAM costs {time_after_offload - time_before_offload:.6f} seconds.") def reload_vram(self) -> bool: if not self.is_offloaded: logger.warning("Executor is not offloaded.") return True while True: time_before_reload = time.perf_counter() res = self.collective_rpc("try_reload_vram") time_after_reload = time.perf_counter() succ = all(x[0] for x in res) if succ: self.is_offloaded = False logger.info(f"Reloading VRAM costs {time_after_reload - time_before_reload:.6f} seconds.") prev_is_self = all(x[1] for x in res) return prev_is_self else: # some workers not get lock self.collective_rpc("vnpu_unlock_gpu") time.sleep(0.001) def determine_available_memory_vnpu_offload_mode(self) -> int: return self.collective_rpc("determine_available_memory_vnpu_offload_mode") ExecutorBase.__init__ = init ExecutorBase.offload_vram = offload_vram ExecutorBase.reload_vram = reload_vram ExecutorBase.determine_available_memory_vnpu_offload_mode = determine_available_memory_vnpu_offload_mode