From b6549b6e3805fa1b88b5a643026335c9eed15302 Mon Sep 17 00:00:00 2001 From: Jing Wang Date: Tue, 12 May 2026 11:51:57 +0000 Subject: [PATCH] Add feature: priority Signed-off-by: Jing Wang --- README.md | 1 + csrc/camem_allocator.cpp | 29 ++++- csrc/vnpu_offload/shm_helper.h | 39 ++++++- csrc/vnpu_offload/shm_manager.cpp | 4 +- csrc/vnpu_offload/shm_worker.cpp | 114 ++++++++++++++++-- csrc/vnpu_offload/shm_worker.h | 16 ++- csrc/vnpu_offload/vnpu_daemon.cpp | 43 ++++--- vllm_ascend/device_allocator/camem.py | 37 +++++- vllm_ascend/patch/platform/patch_core.py | 115 ++++++++++++++++--- vllm_ascend/patch/platform/patch_executor.py | 32 +++++- vllm_ascend/worker/worker.py | 18 ++- 11 files changed, 382 insertions(+), 66 deletions(-) diff --git a/README.md b/README.md index 423febef..ee8b377e 100644 --- a/README.md +++ b/README.md @@ -30,6 +30,7 @@ docker build -t $build_image -f ./Dockerfile . ### Environment Variables - `VNPU_RESERVED_VRAM_SIZE_GB`: The amonut of reserved GPU memory for other miscellaneous memory. Only needs to be set for `vllm_vnpu_daemon`. Try increasing the variable if you launch multiple LLM services and encounter OOM. Default: `8`. - `VLLM_VNPU_SHM_NAME`: The name of the shm file. Needs to be set for all containers of the shared vNPU group. Default: `/vllm_acl_vnpu_offload_shm`. +- `VLLM_VNPU_PRIORITY`: The priority of LLM services. High-priority LLM services are prioritized when processing requests. The value must be an integer in the range [0, 7]. Default: `0`. ## Limitations diff --git a/csrc/camem_allocator.cpp b/csrc/camem_allocator.cpp index eb76b15d..b0960e72 100644 --- a/csrc/camem_allocator.cpp +++ b/csrc/camem_allocator.cpp @@ -595,10 +595,29 @@ static PyObject* python_try_lock_gpu_offload(PyObject* self, PyObject* args) { } static PyObject* python_unlock_gpu_offload(PyObject* self, PyObject* args) { - shm_worker->unlock_gpu(); + int keep_wait = 0; + if (!PyArg_ParseTuple(args, "|p", &keep_wait)) { + return NULL; + } + shm_worker->unlock_gpu(keep_wait != 0); Py_RETURN_NONE; } +static PyObject* python_start_wait_offload(PyObject* self, PyObject* args) { + shm_worker->start_wait(); + Py_RETURN_NONE; +} + +static PyObject* python_cancel_wait_offload(PyObject* self, PyObject* args) { + shm_worker->cancel_wait(); + Py_RETURN_NONE; +} + +static PyObject* python_has_higher_priority_waiter_offload(PyObject* self, PyObject* args) { + bool has_higher = shm_worker->has_higher_priority_waiter(); + return PyBool_FromLong(has_higher); +} + static PyMethodDef module_methods[] = { {"init_module", (PyCFunction)py_init_module, METH_VARARGS, "Initialize module with python_malloc and python_free callables."}, @@ -619,7 +638,13 @@ static PyMethodDef module_methods[] = { {"python_try_lock_gpu_offload", (PyCFunction)python_try_lock_gpu_offload, METH_NOARGS, "Lock GPU."}, {"python_unlock_gpu_offload", (PyCFunction)python_unlock_gpu_offload, - METH_NOARGS, "Unlock GPU."}, + METH_VARARGS, "Unlock GPU."}, + {"python_start_wait_offload", (PyCFunction)python_start_wait_offload, + METH_NOARGS, "Start waiting for GPU lock."}, + {"python_cancel_wait_offload", (PyCFunction)python_cancel_wait_offload, + METH_NOARGS, "Cancel waiting for GPU lock."}, + {"python_has_higher_priority_waiter_offload", (PyCFunction)python_has_higher_priority_waiter_offload, + METH_NOARGS, "Check if there is a higher priority waiter."}, {NULL, NULL, 0, NULL} // sentinel }; diff --git a/csrc/vnpu_offload/shm_helper.h b/csrc/vnpu_offload/shm_helper.h index 9585e43c..821d07ea 100644 --- a/csrc/vnpu_offload/shm_helper.h +++ b/csrc/vnpu_offload/shm_helper.h @@ -16,8 +16,8 @@ #include "spdlog/spdlog.h" -#define MAX_WORKERS 60 -#define MAX_DEVICES 16 +#define MAX_WORKERS 64 +#define MAX_DEVICES 32 static inline std::string get_shm_name() { const char *env_shm_name = getenv("VLLM_VNPU_SHM_NAME"); @@ -34,7 +34,7 @@ static inline std::string get_shm_name() { } static constexpr uint32_t heartbeat_us = 1000; // microseconds -static constexpr uint32_t heartbeat_check_everyN = 50; +static constexpr uint32_t heartbeat_check_everyN = 100; static constexpr uint32_t heartbeat_timeout_us = heartbeat_check_everyN * heartbeat_us; @@ -52,6 +52,8 @@ static inline uint64_t heartbeat_ts_us() { .count()); } +// GPU flag layout (64 bits): +// [lock (1 bit) | reserved (31 bits) | tgid (32 bits)] static inline uint32_t unpack_lock_field(uint64_t gpu_flag) { return static_cast(gpu_flag >> 32); } @@ -68,16 +70,43 @@ static inline uint64_t pack_unlocked_tgid(int32_t tgid) { return static_cast(tgid); } +// waiting_worker_flag layout (64 bits): +// [ device_id (5 bits) | priority (3 bits) | timestamp (24 bits) | tgid (32 bits)] + +static inline uint32_t unpack_waiting_device_id(uint64_t flag) { + return static_cast(flag >> 59); +} + +static inline uint16_t unpack_waiting_priority(uint64_t flag) { + return static_cast((flag >> 56) & 0x7); +} + +static inline uint32_t unpack_waiting_timestamp_ms(uint64_t flag) { + return static_cast((flag >> 32) & 0xFFFFFF); +} + +static inline int32_t unpack_waiting_tgid(uint64_t flag) { + return static_cast(flag & 0xFFFFFFFF); +} + +static inline uint64_t pack_waiting_flag(uint32_t device_id, uint16_t priority, + uint32_t timestamp, int32_t tgid) { + return (static_cast(device_id & 0x1F) << 59) | + (static_cast(priority & 0x7) << 56) | + (static_cast(timestamp & 0xFFFFFF) << 32) | + (static_cast(tgid) & 0xFFFFFFFF); +} + // mmap usually page-aligned struct alignas(64) ShmHelper { struct VramInfo { uint64_t total_vmem_size; uint64_t shareable_handle; }; - VramInfo vram_info[MAX_DEVICES]; // support max 16 NPUs + VramInfo vram_info[MAX_DEVICES]; // support max 32 devices // GPU lock flag std::atomic gpu_flag[MAX_DEVICES]; - // uint8_t _padding1[64 - sizeof(std::atomic)]; + std::atomic waiting_worker_flags[MAX_WORKERS]; // request enum RequestType: uint32_t { diff --git a/csrc/vnpu_offload/shm_manager.cpp b/csrc/vnpu_offload/shm_manager.cpp index a591f280..8a225490 100644 --- a/csrc/vnpu_offload/shm_manager.cpp +++ b/csrc/vnpu_offload/shm_manager.cpp @@ -55,7 +55,7 @@ void ShmManager::run_busy_loop() { while (!stop_loop_flag.load(std::memory_order_acquire)) { process_requests(); - if (loop_cnt % heartbeat_check_everyN== 0) { + if (loop_cnt % heartbeat_check_everyN == 0) { check_heart_beats(); } loop_cnt = (loop_cnt + 1) % heartbeat_check_everyN; @@ -152,6 +152,8 @@ void ShmManager::check_heart_beats() { shm_helper->heart_beats[i].tgid = 0; shm_helper->heart_beats[i].timestamp.store(0, std::memory_order_release); + // clear waiting flag + shm_helper->waiting_worker_flags[i].store(0, std::memory_order_release); // check dead lock for (int gpu_id : valid_gpu_ids) { uint64_t gpu_flag = diff --git a/csrc/vnpu_offload/shm_worker.cpp b/csrc/vnpu_offload/shm_worker.cpp index 3a73a782..eed4b0b4 100644 --- a/csrc/vnpu_offload/shm_worker.cpp +++ b/csrc/vnpu_offload/shm_worker.cpp @@ -1,7 +1,29 @@ #include "shm_worker.h" +static inline uint16_t get_shm_priority() { + const char *env_priority = getenv("VLLM_VNPU_PRIORITY"); + if (env_priority) { + try { + int p = std::stoi(env_priority); + if (p >= 0 && p <= 7) { + return static_cast(p); + } else { + spdlog::warn("VLLM_VNPU_PRIORITY should be between 0 and 7, got {}. Using default 0.", p); + } + } catch (...) { + spdlog::warn("Invalid VLLM_VNPU_PRIORITY format. Using default 0."); + } + } + return 0; +} + ShmWorker::ShmWorker() { + this->priority = get_shm_priority(); + this->waiting_timestamp = 0; + this->is_waiting = false; + this->is_holding_lock = false; + spdlog::info("vNPU worker initialized with priority {}", priority); std::string shm_name = get_shm_name(); int shm_fd = shm_open(shm_name.c_str(), O_RDWR, 0666); if (shm_fd == -1) { @@ -40,16 +62,18 @@ bool ShmWorker::register_worker(int32_t tgid, int gpu_id, if (slot == -1) { return false; } + this->shm_slot = slot; *out_shareable_handle = shm_helper->vram_info[gpu_id].shareable_handle; *out_vmem_size = shm_helper->vram_info[gpu_id].total_vmem_size; stop_heart_beat.store(false, std::memory_order_release); - heart_beat_thread = std::thread(&ShmWorker::heart_beat_loop, this, slot); + heart_beat_thread = std::thread(&ShmWorker::heart_beat_loop, this); return true; } -void ShmWorker::heart_beat_loop(int slot) { +void ShmWorker::heart_beat_loop() { + int slot = this->shm_slot; while (!stop_heart_beat.load(std::memory_order_acquire)) { // update heart beat int32_t shm_tgid = @@ -64,6 +88,7 @@ void ShmWorker::heart_beat_loop(int slot) { spdlog::error("TGID {} failed to re-register as worker", tgid); throw std::runtime_error("Failed to re-register as worker"); } + this->shm_slot = slot; } uint64_t now = heartbeat_ts_us(); shm_helper->heart_beats[slot].timestamp.store(now, @@ -72,32 +97,95 @@ void ShmWorker::heart_beat_loop(int slot) { } } +void ShmWorker::start_wait() { + if (is_waiting) return; // Keep the older timestamp if already waiting + + // Use lower 24 bits of millisecond timestamp + waiting_timestamp = static_cast((heartbeat_ts_us() / 1000) & 0xFFFFFF); + + uint64_t flag = pack_waiting_flag(this->gpu_id, this->priority, waiting_timestamp, this->tgid); + shm_helper->waiting_worker_flags[this->shm_slot].store(flag, std::memory_order_release); + is_waiting = true; +} + +void ShmWorker::cancel_wait() { + if (!is_waiting) return; + + shm_helper->waiting_worker_flags[this->shm_slot].store(0, std::memory_order_release); + is_waiting = false; +} + +bool ShmWorker::has_higher_priority_waiter() { + for (int i = 0; i < MAX_WORKERS; ++i) { + if (i == this->shm_slot) continue; + + uint64_t flag = shm_helper->waiting_worker_flags[i].load(std::memory_order_acquire); + if (flag == 0) continue; + if (unpack_waiting_device_id(flag) != this->gpu_id) continue; + + uint16_t other_prio = unpack_waiting_priority(flag); + + if (other_prio > this->priority) { + return true; // Found a waiter with higher priority + } else if (other_prio == this->priority) { + if (this->is_holding_lock) { + // doesn't need to yield to same priority waiters + continue; + } + if (!this->is_waiting) { + // an earlier waiter with the same priority + return true; + } + uint32_t other_ts = unpack_waiting_timestamp_ms(flag); + // Same priority, compare timestamps (handle 24-bit wrap-around) + // Using 24-bit unsigned subtraction. If the difference is in the lower half, + // my timestamp is greater (i.e., I started waiting later). + uint32_t diff = (this->waiting_timestamp - other_ts) & 0xFFFFFF; + if (diff > 0 && diff < 0x800000) { + return true; // The other worker started waiting earlier + } else if (diff == 0 && unpack_waiting_tgid(flag) < this->tgid) { + // using tgid if timestamps happen to be exactly the same + return true; + } + } + } + return false; +} + bool ShmWorker::try_lock_gpu(bool &out_self_hold) { static int retry_cnt = 0; uint64_t old_flag = shm_helper->gpu_flag[gpu_id].load(std::memory_order_acquire); if (unpack_lock_field(old_flag) == 0) { // free + // Check priority: yield if there are higher priority waiters, or same priority waiters who have waited longer. + if (has_higher_priority_waiter()) { + out_self_hold = false; + return false; + } + uint64_t new_flag = pack_locked_tgid(tgid); if (shm_helper->gpu_flag[gpu_id].compare_exchange_weak( old_flag, new_flag, std::memory_order_acq_rel, std::memory_order_acquire)) { - spdlog::info("TGID {} acquired GPU {} lock", tgid, gpu_id); + // spdlog::info("TGID {} acquired GPU {} lock", tgid, gpu_id); int32_t prev_tgid = unpack_tgid_field(old_flag); out_self_hold = prev_tgid == tgid; retry_cnt = 0; + this->is_holding_lock = true; return true; } } else { // locked if (unpack_tgid_field(old_flag) == tgid) { - spdlog::info("TGID {} already holds the GPU {} lock", tgid, gpu_id); + // spdlog::info("TGID {} already holds the GPU {} lock", tgid, gpu_id); out_self_hold = true; retry_cnt = 0; + this->is_holding_lock = true; return true; } } // failed - if (++retry_cnt % 2000 == 0) { + if (++retry_cnt % 10000 == 0) { spdlog::info( "TGID {} trying to acquire GPU {} lock, current lock holder TGID {}", tgid, gpu_id, unpack_tgid_field(old_flag)); @@ -116,19 +204,23 @@ bool ShmWorker::lock_gpu(bool &out_self_hold) { } } -void ShmWorker::unlock_gpu() { +void ShmWorker::unlock_gpu(bool keep_wait) { + if (!keep_wait) { + cancel_wait(); + } + uint64_t old_flag = shm_helper->gpu_flag[gpu_id].load(std::memory_order_acquire); if (unpack_tgid_field(old_flag) != tgid) { - // spdlog::warn("previous gpu flag {} does not match expected locked flag for " - // "TGID {}. This may be a bug, unless during startup.", - // old_flag, tgid); - spdlog::info("TGID {} does not hold GPU {} lock", tgid, gpu_id); + if (!keep_wait) { + spdlog::info("unlock: TGID {} does not hold GPU {} lock", tgid, gpu_id); + } } else { uint64_t new_flag = pack_unlocked_tgid(tgid); shm_helper->gpu_flag[gpu_id].store(new_flag, std::memory_order_release); - spdlog::info("TGID {} released GPU {} lock", tgid, gpu_id); + // spdlog::info("TGID {} released GPU {} lock", tgid, gpu_id); } + this->is_holding_lock = false; } uint64_t ShmWorker::make_request(uint32_t type, uint64_t parameter) { diff --git a/csrc/vnpu_offload/shm_worker.h b/csrc/vnpu_offload/shm_worker.h index cbd4e4c8..95444689 100644 --- a/csrc/vnpu_offload/shm_worker.h +++ b/csrc/vnpu_offload/shm_worker.h @@ -17,11 +17,21 @@ class ShmWorker { bool try_lock_gpu(bool &out_self_hold); bool lock_gpu(bool &out_self_hold); - void unlock_gpu(); + void unlock_gpu(bool keep_wait = false); + + bool has_higher_priority_waiter(); + void start_wait(); + void cancel_wait(); private: int32_t tgid; int gpu_id; + int shm_slot; + uint16_t priority; + uint32_t waiting_timestamp; + bool is_waiting; + bool is_holding_lock; + ShmHelper *shm_helper; std::thread heart_beat_thread; std::atomic stop_heart_beat; @@ -31,5 +41,5 @@ class ShmWorker { int register_worker_shm(); // heart beat - void heart_beat_loop(int slot); -}; + void heart_beat_loop(); +}; \ No newline at end of file diff --git a/csrc/vnpu_offload/vnpu_daemon.cpp b/csrc/vnpu_offload/vnpu_daemon.cpp index 5a29d4a6..27af3b74 100644 --- a/csrc/vnpu_offload/vnpu_daemon.cpp +++ b/csrc/vnpu_offload/vnpu_daemon.cpp @@ -1,25 +1,29 @@ #include #include -#include -#include -#include -#include -#include -#include #include +#include #include #include +#include +#include +#include +#include +#include #include "acl/acl.h" -#include "shm_manager.h" #include "npu_helper.h" +#include "shm_manager.h" #include "spdlog/spdlog.h" static ShmManager *shm_manager = nullptr; +static inline double TO_GB(size_t bytes) { + return static_cast(bytes) / (1024.0 * 1024.0 * 1024.0); +} + void handle_signal(int sig) { if (shm_manager) { shm_manager->stop_busy_loop(); @@ -49,7 +53,7 @@ size_t get_reserved_vram_size() { reserved_vram_size = size_gb * 1024 * 1024 * 1024; } catch (const std::exception &e) { spdlog::warn("Failed to parse VNPU_RESERVED_VRAM_SIZE_GB: {}, using " - "default 8GB", + "default 8 GB", e.what()); } } @@ -68,12 +72,13 @@ void ensure_context(unsigned long long device) { } void init_acl() { - int32_t deviceId=0; + int32_t deviceId = 0; aclError ret = aclrtSetDevice(deviceId); if (ret != ACL_ERROR_NONE) { - throw std::runtime_error("aclrtSetDevice failed with acl error code: " + - std::to_string(ret) + " " + __FILE__ + ":" + std::to_string(__LINE__)); + throw std::runtime_error( + "aclrtSetDevice failed with acl error code: " + std::to_string(ret) + + " " + __FILE__ + ":" + std::to_string(__LINE__)); } } @@ -109,8 +114,9 @@ void alloc_physical(uint32_t device_id, aclrtDrvMemHandle &out_mem_handle, spdlog::error("aclrtGetMemInfo failed, error_code: {}", error_code); throw std::runtime_error("aclrtGetMemInfo failed"); } else { - spdlog::info("aclrtGetMemInfo succeeded, free_mem: {}, total: {}", free_mem, - total); + spdlog::info( + "aclrtGetMemInfo succeeded, free_mem: {:.2f} GB, total: {:.2f} GB", + TO_GB(free_mem), TO_GB(total)); } aclrtPhysicalMemProp prop = {}; @@ -129,13 +135,14 @@ void alloc_physical(uint32_t device_id, aclrtDrvMemHandle &out_mem_handle, error_code); throw std::runtime_error("aclrtMemGetAllocationGranularity failed"); } else { - spdlog::info("aclrtMemGetAllocationGranularity succeeded, granularity: {}", + spdlog::info("aclrtMemGetAllocationGranularity succeeded, granularity: {} bytes", granularity); } size_t reserved_mem_size = get_reserved_vram_size(); if (free_mem < reserved_mem_size) { - spdlog::error("Not enough free memory to reserve: {}, free_mem: {}", - reserved_mem_size, free_mem); + spdlog::error( + "Not enough free memory to reserve: {:.2f} GB, free_mem: {:.2f} GB", + TO_GB(reserved_mem_size), TO_GB(free_mem)); throw std::runtime_error("Not enough free memory to reserve"); } out_g_size = free_mem - reserved_mem_size; @@ -147,8 +154,8 @@ void alloc_physical(uint32_t device_id, aclrtDrvMemHandle &out_mem_handle, spdlog::error("aclrtMallocPhysical failed, error_code: {}", error_code); throw std::runtime_error("aclrtMallocPhysical failed"); } else { - spdlog::info("device {} aclrtMallocPhysical succeeded, size: {}", device_id, - out_g_size); + spdlog::info("device {} aclrtMallocPhysical succeeded, size: {:.2f} GB", + device_id, TO_GB(out_g_size)); } } diff --git a/vllm_ascend/device_allocator/camem.py b/vllm_ascend/device_allocator/camem.py index e3cc6fd6..f76531ba 100644 --- a/vllm_ascend/device_allocator/camem.py +++ b/vllm_ascend/device_allocator/camem.py @@ -62,7 +62,10 @@ try: python_create_and_map_offload as python_create_and_map,python_unmap_and_release_offload as python_unmap_and_release, python_get_mem_info_offload as python_get_mem_info, python_try_lock_gpu_offload as python_try_lock_gpu, - python_unlock_gpu_offload as python_unlock_gpu + python_unlock_gpu_offload as python_unlock_gpu, + python_start_wait_offload as python_start_wait, + python_cancel_wait_offload as python_cancel_wait, + python_has_higher_priority_waiter_offload as python_has_higher_priority_waiter ) else: from vllm_ascend.vllm_ascend_C import ( # type: ignore # noqa: F401 @@ -73,6 +76,9 @@ try: python_get_mem_info = None python_try_lock_gpu = None python_unlock_gpu = None + python_start_wait = None + python_cancel_wait = None + python_has_higher_priority_waiter = None lib_name = find_loaded_library("vllm_ascend_C") camem_available = True @@ -84,6 +90,9 @@ except ImportError as e: python_get_mem_info = None python_try_lock_gpu = None python_unlock_gpu = None + python_start_wait = None + python_cancel_wait = None + python_has_higher_priority_waiter = None lib_name = None libcudart = None @@ -306,15 +315,37 @@ class CaMemAllocator: return False, False def _vnpu_lock_gpu(self) -> bool: + is_waiting = False while True: success, _ = self.vnpu_try_lock_gpu() if success: + if is_waiting: + self.vnpu_cancel_wait() return True + else: + if not is_waiting: + self.vnpu_start_wait() + is_waiting = True + self.vnpu_unlock_gpu(keep_wait=True) + time.sleep(0.001) - def vnpu_unlock_gpu(self): + def vnpu_unlock_gpu(self, keep_wait: bool = False): if python_unlock_gpu: - python_unlock_gpu() + python_unlock_gpu(keep_wait) + + def vnpu_start_wait(self) -> None: + if python_start_wait: + python_start_wait() + + def vnpu_cancel_wait(self) -> None: + if python_cancel_wait: + python_cancel_wait() + + def vnpu_has_higher_priority_waiter(self) -> bool: + if python_has_higher_priority_waiter: + return python_has_higher_priority_waiter() + return False def get_pool_mem_info(self) -> tuple[int, int]: """ diff --git a/vllm_ascend/patch/platform/patch_core.py b/vllm_ascend/patch/platform/patch_core.py index 829f3ff9..91f83b27 100644 --- a/vllm_ascend/patch/platform/patch_core.py +++ b/vllm_ascend/patch/platform/patch_core.py @@ -1,6 +1,8 @@ +from concurrent.futures import Future from logging import DEBUG -import signal import queue +import signal +import time from vllm.config import ParallelConfig, VllmConfig from vllm.logger import logger @@ -98,8 +100,13 @@ def run_engine_core(*args, dp_rank: int = 0, local_dp_rank: int = 0, **kwargs): if engine_core is not None: engine_core.shutdown() + def run_busy_loop(self): """Core busy loop of the EngineCore.""" + # vnpu yield + yield_probe_counter = 0 + prepared_yield = False + while self._handle_shutdown(): # 1) Poll the input queue until there is work to do. self._process_input_queue() @@ -111,14 +118,52 @@ def run_busy_loop(self): prev_is_self = self.model_executor.reload_vram() if not prev_is_self: self.reset_prefix_cache() + # 2) Step the engine core and return the outputs. self._process_engine_step() - if ( - envs_ascend.VLLM_ASCEND_ENABLE_VNPU - and not self.has_work() - and not self.model_executor.is_offloaded() - ): - self.model_executor.offload_vram() + + if envs_ascend.VLLM_ASCEND_ENABLE_VNPU: + if not self.has_work(): + if not self.model_executor.is_offloaded(): + self.model_executor.offload_vram() + elif not prepared_yield: + # check should yield every 10 steps + yield_probe_counter = (yield_probe_counter + 1) % 10 + if yield_probe_counter == 0: + should_yield = self.model_executor.vnpu_has_higher_priority_waiter() + if should_yield: + logger.info( + "Found other higher priority worker. Current engine will yield after finishing in-flight requests." + ) + prepared_yield = True + pause_future = self.pause_scheduler( + mode="wait", clear_cache=True + ) + + def pause_complete(f: Future): + nonlocal prepared_yield + try: + if f: + f.result() + if not self.model_executor.is_offloaded(): + self.model_executor.offload_vram(is_yield=True) + prepared_yield = False + logger.info("Current engine has yielded.") + # Scheduler should wake up itself after yielding. + # Sleep some time to give chance to other worker. + time.sleep(2) + self.resume_scheduler() + except Exception as e: + logger.exception("Failed to yield: {e}.") + raise e + + if pause_future is None: + # pause finished, no in-flight requests + pause_complete(None) + else: + # pause_future will be set after all in-flight + # requests are finished in _process_input_queue + pause_future.add_done_callback(pause_complete) raise SystemExit @@ -142,7 +187,8 @@ def _process_input_queue(self): and not self.model_executor.is_offloaded() ): self.model_executor.offload_vram() - block = self.process_input_queue_block + # vNPU: if scheduler is resumed and has work, should not block + block = self.process_input_queue_block and not self.has_work() try: req = self.input_queue.get(block=block) self._handle_client_request(*req) @@ -169,6 +215,9 @@ EngineCoreProc._process_input_queue = _process_input_queue def DPEngineCoreProc_run_busy_loop(self): """Core busy loop of the EngineCore for data parallel case.""" + # vnpu yield + yield_probe_counter = 0 + prepared_yield = False # Loop until process is sent a SIGINT or SIGTERM while self._handle_shutdown(): @@ -226,13 +275,49 @@ def DPEngineCoreProc_run_busy_loop(self): # Increment wave count and reset step counter. self.current_wave += 1 self.step_counter = 0 - - if ( - envs_ascend.VLLM_ASCEND_ENABLE_VNPU - and not self.has_work() - and not self.model_executor.is_offloaded() - ): - self.model_executor.offload_vram() + + if envs_ascend.VLLM_ASCEND_ENABLE_VNPU: + if not self.has_work(): + if not self.model_executor.is_offloaded(): + self.model_executor.offload_vram() + elif not prepared_yield: + # check should yield every 10 steps + yield_probe_counter = (yield_probe_counter + 1) % 10 + if yield_probe_counter == 0: + should_yield = self.model_executor.vnpu_has_higher_priority_waiter() + if should_yield: + logger.info( + "Found other higher priority worker. Current engine will yield after finishing in-flight requests." + ) + prepared_yield = True + pause_future = self.pause_scheduler( + mode="wait", clear_cache=True + ) + + def pause_complete(f: Future): + nonlocal prepared_yield + try: + if f: + f.result() + if not self.model_executor.is_offloaded(): + self.model_executor.offload_vram(is_yield=True) + prepared_yield = False + logger.info("Current engine has yielded.") + # Scheduler should wake up itself after yielding. + # Sleep some time to give chance to other worker. + time.sleep(2) + self.resume_scheduler() + except Exception as e: + logger.exception("Failed to yield: {e}.") + raise e + + if pause_future is None: + # pause finished, no in-flight requests + pause_complete(None) + else: + # pause_future will be set after all in-flight + # requests are finished in _process_input_queue + pause_future.add_done_callback(pause_complete) raise SystemExit diff --git a/vllm_ascend/patch/platform/patch_executor.py b/vllm_ascend/patch/platform/patch_executor.py index f3e6f8e0..4b7a89ea 100644 --- a/vllm_ascend/patch/platform/patch_executor.py +++ b/vllm_ascend/patch/platform/patch_executor.py @@ -8,7 +8,12 @@ def is_offloaded(self) -> bool: self._is_offloaded = False return self._is_offloaded -def offload_vram(self): +def is_yielded(self) -> bool: + if not hasattr(self, "_is_yielded"): + self._is_yielded = False + return self._is_yielded + +def offload_vram(self, is_yield: bool = False): if self.is_offloaded(): logger.warning("Executor is already offloaded.") return @@ -17,14 +22,18 @@ def offload_vram(self): time_after_offload = time.perf_counter() self._is_offloaded = True - logger.info(f"Offloading VRAM costs {time_after_offload - time_before_offload:.6f} seconds.") - + if is_yield: + self._is_yielded = True + logger.info( + f"Offloading VRAM costs {time_after_offload - time_before_offload:.3f} seconds." + ) def reload_vram(self) -> bool: if not self.is_offloaded(): logger.warning("Executor is not offloaded.") return True + is_waiting = False while True: time_before_reload = time.perf_counter() res = self.collective_rpc("try_reload_vram") @@ -33,15 +42,28 @@ def reload_vram(self) -> bool: succ = all(x[0] for x in res) if succ: self._is_offloaded = False - logger.info(f"Reloading VRAM costs {time_after_reload - time_before_reload:.6f} seconds.") + self._is_yielded = False prev_is_self = all(x[1] for x in res) + if is_waiting: + self.collective_rpc("vnpu_cancel_wait") + logger.info( + f"Reloading VRAM costs {time_after_reload - time_before_reload:.3f} seconds." + ) return prev_is_self else: # some workers not get lock - self.collective_rpc("vnpu_unlock_gpu") + if not is_waiting: + self.collective_rpc("vnpu_start_wait") + is_waiting = True + self.collective_rpc("vnpu_unlock_gpu", kwargs={"keep_wait": True}) time.sleep(0.001) +def vnpu_has_higher_priority_waiter(self) -> bool: + res = self.collective_rpc("vnpu_has_higher_priority_waiter") + return any(res) + Executor.is_offloaded = is_offloaded Executor.offload_vram = offload_vram Executor.reload_vram = reload_vram +Executor.vnpu_has_higher_priority_waiter = vnpu_has_higher_priority_waiter diff --git a/vllm_ascend/worker/worker.py b/vllm_ascend/worker/worker.py index 9a17dfdd..c1f4dfb9 100644 --- a/vllm_ascend/worker/worker.py +++ b/vllm_ascend/worker/worker.py @@ -470,7 +470,7 @@ class NPUWorker(WorkerBase): # save memory to host with lock self.offload_vram() succ, _ = self.try_reload_vram() - assert succ, "Failed to reload model weights after offloading." + # assert succ, "Failed to reload model weights after offloading." def offload_vram(self) -> None: allocator = CaMemAllocator.get_instance() @@ -480,9 +480,21 @@ class NPUWorker(WorkerBase): allocator = CaMemAllocator.get_instance() return allocator.try_reload_vram(tags=None) - def vnpu_unlock_gpu(self) -> None: + def vnpu_unlock_gpu(self, keep_wait: bool = False) -> None: allocator = CaMemAllocator.get_instance() - allocator.vnpu_unlock_gpu() + allocator.vnpu_unlock_gpu(keep_wait) + + def vnpu_start_wait(self) -> None: + allocator = CaMemAllocator.get_instance() + allocator.vnpu_start_wait() + + def vnpu_cancel_wait(self) -> None: + allocator = CaMemAllocator.get_instance() + allocator.vnpu_cancel_wait() + + def vnpu_has_higher_priority_waiter(self) -> bool: + allocator = CaMemAllocator.get_instance() + return allocator.vnpu_has_higher_priority_waiter() def compile_or_warm_up_model(self) -> float: # Note: need to adapt for graph mode.