From 2a571d8bc87958ba47cda3db7609f309455e8fd3 Mon Sep 17 00:00:00 2001 From: starkwj Date: Thu, 8 Jan 2026 06:54:33 +0000 Subject: [PATCH] support multi npu partially --- README.md | 3 +- csrc/camem_allocator.cpp | 33 +++-- csrc/idle_offload/npu_helper.h | 86 ++++++++++++ csrc/idle_offload/offload_daemon.cpp | 135 ++++++++++--------- csrc/idle_offload/shm_helper.h | 24 ++-- csrc/idle_offload/shm_manager.cpp | 27 ++-- csrc/idle_offload/shm_manager.h | 3 +- csrc/idle_offload/shm_worker.cpp | 90 ++++++++----- csrc/idle_offload/shm_worker.h | 12 +- vllm_ascend/device_allocator/camem.py | 44 +++--- vllm_ascend/patch/platform/patch_executor.py | 21 ++- vllm_ascend/worker/worker_v1.py | 13 +- 12 files changed, 331 insertions(+), 160 deletions(-) create mode 100644 csrc/idle_offload/npu_helper.h diff --git a/README.md b/README.md index c55d37f..78aeca5 100644 --- a/README.md +++ b/README.md @@ -30,6 +30,5 @@ docker build -t vllm-ascend-multi-llm:latest -f ./Dockerfile . ## Limitations -- This project only support share a single NPU currently. This is also limited by the fact that HCCL cannot be shared. +- Restricted by the fact that HCCL cannot be shared, deploying more than one model with multi-GPU (e.g., TP) is not feasible currently. - The prefix cache will be reset when the LLM is restored, since we just simply discard the KV cache when the LLM is offloaded. - diff --git a/csrc/camem_allocator.cpp b/csrc/camem_allocator.cpp index 5b0fa9a..e88c695 100644 --- a/csrc/camem_allocator.cpp +++ b/csrc/camem_allocator.cpp @@ -20,6 +20,7 @@ #include #include "idle_offload/shm_worker.h" +#include "idle_offload/npu_helper.h" extern "C" { @@ -474,8 +475,10 @@ static PyObject* python_create_and_map(PyObject* self, PyObject* args) { static PyObject* py_init_module_offload(PyObject* self, PyObject* args) { PyObject* malloc_callback = nullptr; PyObject* free_callback = nullptr; + unsigned long long device = 0; - if (!PyArg_ParseTuple(args, "OO", &malloc_callback, &free_callback)) { + if (!PyArg_ParseTuple(args, "OOK", &malloc_callback, &free_callback, + &device)) { return nullptr; } @@ -497,7 +500,13 @@ static PyObject* py_init_module_offload(PyObject* self, PyObject* args) { } g_initialized.store(true); - shm_worker = new ShmWorker(); + std::vector gpu_ids = get_npu_ids(); + if (device >= gpu_ids.size()) { + throw std::runtime_error("Invalid device id: " + std::to_string(device) + + " " + __FILE__ + ":" + std::to_string(__LINE__)); + } + int gpu_id = gpu_ids[device]; + // get pid aclError error_code; int32_t pid; @@ -508,11 +517,12 @@ static PyObject* py_init_module_offload(PyObject* self, PyObject* args) { std::to_string(error_code) + " " + __FILE__ + ":" + std::to_string(__LINE__)); } + + shm_worker = new ShmWorker(); uint64_t shareable_handle; - shm_worker->register_worker(pid, &shareable_handle, &g_size); + shm_worker->register_worker(pid, gpu_id, &shareable_handle, &g_size); // import shareable handle - uint32_t device = 0; aclrtDrvMemHandle memHandle; error_code = aclrtMemImportFromShareableHandle(shareable_handle, device, &memHandle); @@ -570,9 +580,16 @@ static PyObject* python_get_mem_info_offload(PyObject* self, PyObject* args) { return tuple; } -static PyObject* python_lock_gpu_offload(PyObject* self, PyObject* args) { - bool prev_is_self = shm_worker->lock_gpu(); - return PyBool_FromLong(prev_is_self); +static PyObject* python_try_lock_gpu_offload(PyObject* self, PyObject* args) { + bool prev_is_self = false; + bool success = shm_worker->try_lock_gpu(prev_is_self); + PyObject* tuple = PyTuple_New(2); + if (!tuple) { + return nullptr; + } + PyTuple_SetItem(tuple, 0, PyBool_FromLong(success)); + PyTuple_SetItem(tuple, 1, PyBool_FromLong(prev_is_self)); + return tuple; } static PyObject* python_unlock_gpu_offload(PyObject* self, PyObject* args) { @@ -597,7 +614,7 @@ static PyMethodDef module_methods[] = { "Unmap and release memory on the device."}, {"python_get_mem_info_offload", (PyCFunction)python_get_mem_info_offload, METH_NOARGS, "Get mem info in the reserved pool."}, - {"python_lock_gpu_offload", (PyCFunction)python_lock_gpu_offload, + {"python_try_lock_gpu_offload", (PyCFunction)python_try_lock_gpu_offload, METH_NOARGS, "Lock GPU."}, {"python_unlock_gpu_offload", (PyCFunction)python_unlock_gpu_offload, METH_NOARGS, "Unlock GPU."}, diff --git a/csrc/idle_offload/npu_helper.h b/csrc/idle_offload/npu_helper.h new file mode 100644 index 0000000..7280b64 --- /dev/null +++ b/csrc/idle_offload/npu_helper.h @@ -0,0 +1,86 @@ +#include +#include +#include +#include +#include + +#include "spdlog/spdlog.h" + +#include "acl/acl.h" + +std::vector get_available_devices() { + namespace fs = std::filesystem; + + std::vector devices; + const std::string dev_path = "/dev"; + const std::string prefix = "davinci"; + + if (!fs::exists(dev_path)) { + return devices; + } + + try { + for (const auto &entry : fs::directory_iterator(dev_path)) { + std::string filename = entry.path().filename().string(); + if (filename.rfind(prefix, 0) == 0) { + + std::string suffix = filename.substr(prefix.length()); + + // filter not digit suffix + if (!suffix.empty() && + std::all_of(suffix.begin(), suffix.end(), + [](unsigned char c) { return std::isdigit(c); })) { + try { + int id = std::stoi(suffix); + devices.push_back(id); + } catch (...) { + } + } + } + } + } catch (const fs::filesystem_error &e) { + spdlog::error("Error accessing /dev: {}", e.what()); + } + + std::sort(devices.begin(), devices.end()); + return devices; +} + +std::vector get_npu_ids() { + std::vector available_devices = get_available_devices(); + std::vector npu_ids; + uint32_t device_count = 0; + aclError error_code = aclrtGetDeviceCount(&device_count); + if (error_code != 0) { + spdlog::error("Failed to get NPU device count, error code: {}", error_code); + throw std::runtime_error("Failed to get NPU device count"); + } + if (device_count > available_devices.size()) { + spdlog::error("The number of available NPU devices ({}) is less than the " + "number of devices reported by ACL ({}).", + available_devices.size(), device_count); + throw std::runtime_error("Inconsistent NPU device count"); + } + + const char *env_available_npu = getenv("ASCEND_RT_VISIBLE_DEVICES"); + if (env_available_npu) { + std::string npu_str(env_available_npu); + size_t start = 0; + while (start < npu_str.size()) { + size_t next = npu_str.find(',', start); + if (next == std::string::npos) { + next = npu_str.size(); + } + int device_id = std::stoi(npu_str.substr(start, next - start)); + npu_ids.push_back(available_devices[device_id]); + start = next + 1; + if (npu_ids.size() >= device_count) { + break; + } + } + } else { + npu_ids.insert(npu_ids.end(), available_devices.begin(), + available_devices.begin() + device_count); + } + return npu_ids; +} diff --git a/csrc/idle_offload/offload_daemon.cpp b/csrc/idle_offload/offload_daemon.cpp index cdbf393..8a4536d 100644 --- a/csrc/idle_offload/offload_daemon.cpp +++ b/csrc/idle_offload/offload_daemon.cpp @@ -13,6 +13,7 @@ #include "acl/acl.h" #include "shm_manager.h" +#include "npu_helper.h" #include "spdlog/spdlog.h" @@ -49,8 +50,6 @@ void ensure_context(unsigned long long device) { void init_acl() { int32_t deviceId=0; - // aclrtStream stream; - bool g_isDevice; aclError ret = aclrtSetDevice(deviceId); if (ret != ACL_ERROR_NONE) { @@ -59,7 +58,8 @@ void init_acl() { } } -void reset_pids(const std::vector &pids, uint64_t shareable_handle) { +void reset_pids(const std::vector &pids, + const std::vector &shareable_handles) { int cnt = pids.size(); if (cnt <= 0) { return; @@ -68,21 +68,21 @@ void reset_pids(const std::vector &pids, uint64_t shareable_handle) { int32_t pids_data[cnt]; memcpy(pids_data, pids.data(), cnt * sizeof(int32_t)); - aclError error_code = - aclrtMemSetPidToShareableHandle(shareable_handle, pids_data, cnt); - if (error_code != 0) { - spdlog::error("aclrtMemSetPidToShareableHandle failed, error_code: {}", - error_code); - throw std::runtime_error("aclrtMemSetPidToShareableHandle failed"); - } else { - spdlog::info("aclrtMemSetPidToShareableHandle succeeded, num_pids: {}", - cnt); + for (int i = 0; i < shareable_handles.size(); ++i) { + uint64_t shareable_handle = shareable_handles[i]; + aclError error_code = + aclrtMemSetPidToShareableHandle(shareable_handle, pids_data, cnt); + if (error_code != 0) { + spdlog::error("aclrtMemSetPidToShareableHandle failed, error_code: {}", + error_code); + throw std::runtime_error("aclrtMemSetPidToShareableHandle failed"); + } } + spdlog::info("aclrtMemSetPidToShareableHandle succeeded, num_pids: {}", cnt); } -void start_daemon() { - init_acl(); - +void alloc_physical(uint32_t device_id, aclrtDrvMemHandle &out_mem_handle, + size_t &out_g_size) { aclError error_code; size_t free_mem = 0, total = 0; error_code = aclrtGetMemInfo(ACL_HBM_MEM, &free_mem, &total); @@ -94,12 +94,11 @@ void start_daemon() { total); } - uint32_t device = 0; aclrtPhysicalMemProp prop = {}; prop.handleType = ACL_MEM_HANDLE_TYPE_NONE; prop.allocationType = ACL_MEM_ALLOCATION_TYPE_PINNED; prop.memAttr = ACL_HBM_MEM_HUGE; - prop.location.id = device; + prop.location.id = device_id; prop.location.type = ACL_MEM_LOCATION_TYPE_DEVICE; prop.reserve = 0; @@ -107,7 +106,8 @@ void start_daemon() { error_code = aclrtMemGetAllocationGranularity( &prop, ACL_RT_MEM_ALLOC_GRANULARITY_MINIMUM, &granularity); if (error_code != 0) { - spdlog::error("aclrtMemGetAllocationGranularity failed, error_code: {}", error_code); + spdlog::error("aclrtMemGetAllocationGranularity failed, error_code: {}", + error_code); throw std::runtime_error("aclrtMemGetAllocationGranularity failed"); } else { spdlog::info("aclrtMemGetAllocationGranularity succeeded, granularity: {}", @@ -118,59 +118,68 @@ void start_daemon() { reserved_mem_size, free_mem); throw std::runtime_error("Not enough free memory to reserve"); } - size_t g_size = free_mem - reserved_mem_size; - g_size = (g_size / granularity) * granularity; + out_g_size = free_mem - reserved_mem_size; + out_g_size = (out_g_size / granularity) * granularity; // allocate physical memory - aclrtDrvMemHandle mem_handle; - error_code = aclrtMallocPhysical(&mem_handle, g_size, &prop, 0); + error_code = aclrtMallocPhysical(&out_mem_handle, out_g_size, &prop, 0); if (error_code != 0) { spdlog::error("aclrtMallocPhysical failed, error_code: {}", error_code); throw std::runtime_error("aclrtMallocPhysical failed"); } else { - spdlog::info("aclrtMallocPhysical succeeded, size: {}", g_size); + spdlog::info("device {} aclrtMallocPhysical succeeded, size: {}", device_id, + out_g_size); } +} - // // reserve address - // void *vmem_addr = nullptr; - // error_code = aclrtReserveMemAddress(&vmem_addr, g_size, 0, nullptr, 0); - // if (error_code != 0) { - // spdlog::error("aclrtReserveMemAddress failed, error_code: {}", error_code); - // throw std::runtime_error("aclrtReserveMemAddress failed"); - // } else { - // spdlog::info("aclrtReserveMemAddress succeeded, vmem_addr: {}", vmem_addr); - // } - - // // map - // error_code = aclrtMapMem(vmem_addr, g_size, 0, mem_handle, 0); - // if (error_code != 0) { - // spdlog::error("aclrtMapMem failed, error_code: {}", error_code); - // throw std::runtime_error("aclrtMapMem failed"); - // } else { - // spdlog::info("aclrtMapMem succeeded, vmem_addr: {}", vmem_addr); - // } - - // export - uint64_t shareable_handle; - error_code = aclrtMemExportToShareableHandle( - mem_handle, ACL_MEM_HANDLE_TYPE_NONE, ACL_RT_VMM_EXPORT_FLAG_DEFAULT, - &shareable_handle); - if (error_code != 0) { - spdlog::error("aclrtMemExportToShareableHandle failed, error_code: {}", - error_code); - throw std::runtime_error("aclrtMemExportToShareableHandle failed"); - } else { - spdlog::info( - "aclrtMemExportToShareableHandle succeeded, shareable_handle: {}", - shareable_handle); - } +void start_daemon() { + init_acl(); + std::vector npu_ids = get_npu_ids(); + std::vector mem_handles; + std::vector shareable_handles; // shm shm_manager = new ShmManager(); - shm_manager->set_gpu_info(g_size, shareable_handle); + + for (int i = 0; i < npu_ids.size(); ++i) { + uint32_t device_id = i; + int npu_id = npu_ids[i]; + spdlog::info("Setting up device id {} - npu id {}", device_id, npu_id); + aclError error_code = aclrtSetDevice(device_id); + if (error_code != ACL_ERROR_NONE) { + throw std::runtime_error("aclrtSetDevice failed with acl error code: " + + std::to_string(error_code) + " " + __FILE__ + + ":" + std::to_string(__LINE__)); + } + + // alloc physical + aclrtDrvMemHandle mem_handle; + size_t g_size; + alloc_physical(device_id, mem_handle, g_size); + mem_handles.push_back(mem_handle); + + // export + uint64_t shareable_handle; + error_code = aclrtMemExportToShareableHandle( + mem_handle, ACL_MEM_HANDLE_TYPE_NONE, ACL_RT_VMM_EXPORT_FLAG_DEFAULT, + &shareable_handle); + if (error_code != 0) { + spdlog::error("aclrtMemExportToShareableHandle failed, error_code: {}", + error_code); + throw std::runtime_error("aclrtMemExportToShareableHandle failed"); + } else { + spdlog::info( + "aclrtMemExportToShareableHandle succeeded, shareable_handle: {}", + shareable_handle); + } + + shm_manager->set_gpu_info(npu_id, g_size, shareable_handle); + shareable_handles.push_back(shareable_handle); + } + shm_manager->register_callback_on_worker_change( [&](const std::vector &pids) { - reset_pids(pids, shareable_handle); + reset_pids(pids, shareable_handles); }); // start busy loop @@ -181,10 +190,12 @@ void start_daemon() { shm_manager = nullptr; // free physical memory - error_code = aclrtFreePhysical(mem_handle); - if (error_code != 0) { - spdlog::error("aclrtFreePhysical failed, error_code: {}", error_code); - throw std::runtime_error("aclrtFreePhysical failed"); + for (auto mem_handle : mem_handles) { + aclError error_code = aclrtFreePhysical(mem_handle); + if (error_code != 0) { + spdlog::error("aclrtFreePhysical failed, error_code: {}", error_code); + throw std::runtime_error("aclrtFreePhysical failed"); + } } } diff --git a/csrc/idle_offload/shm_helper.h b/csrc/idle_offload/shm_helper.h index e563902..74aa979 100644 --- a/csrc/idle_offload/shm_helper.h +++ b/csrc/idle_offload/shm_helper.h @@ -19,6 +19,7 @@ #define MAX_WORKERS 60 +#define MAX_DEVICES 16 // static constexpr const char *SHM_NAME = "/vllm_acl_vnpu_offload_shm"; static inline std::string get_shm_name() { const char *env_shm_name = getenv("VLLM_IDLE_OFFLOAD_SHM_NAME"); @@ -69,11 +70,14 @@ static inline uint64_t pack_unlocked_tgid(int32_t tgid) { // mmap usually page-aligned struct alignas(64) ShmHelper { + struct VramInfo { + uint64_t total_vmem_size; + uint64_t shareable_handle; + }; + VramInfo vram_info[MAX_DEVICES]; // support max 16 NPUs // GPU lock flag - std::atomic gpu_flag; - uint64_t total_vmem_size; - uint64_t shareable_handle; - uint8_t _padding[64 - sizeof(std::atomic) - 2 * sizeof(uint64_t)]; + std::atomic gpu_flag[MAX_DEVICES]; + // uint8_t _padding1[64 - sizeof(std::atomic)]; // request enum RequestType: uint32_t { @@ -105,14 +109,16 @@ struct alignas(64) ShmHelper { WorkerHeartBeat heart_beats[MAX_WORKERS]; void init() { - gpu_flag.store(0, std::memory_order_release); + memset(vram_info, 0, sizeof(vram_info)); + for (size_t i = 0; i < MAX_DEVICES; ++i) { + gpu_flag[i].store(0, std::memory_order_release); + } req_ready.store(READY_STATE_NO_REQUEST, std::memory_order_release); } - void set_gpu_info(uint64_t vmem_size, uint64_t shared_handle) { - total_vmem_size = vmem_size; - shareable_handle = shared_handle; - init(); + void set_gpu_info(int gpu_id, uint64_t vmem_size, uint64_t shared_handle) { + vram_info[gpu_id].total_vmem_size = vmem_size; + vram_info[gpu_id].shareable_handle = shared_handle; } }; diff --git a/csrc/idle_offload/shm_manager.cpp b/csrc/idle_offload/shm_manager.cpp index b87c811..bfeea96 100644 --- a/csrc/idle_offload/shm_manager.cpp +++ b/csrc/idle_offload/shm_manager.cpp @@ -1,7 +1,6 @@ #include "shm_manager.h" #include -// === ShmManager === ShmManager::ShmManager() { std::string shm_name = get_shm_name(); @@ -37,11 +36,12 @@ ShmManager::~ShmManager() { shm_unlink(shm_name.c_str()); } -void ShmManager::set_gpu_info(uint64_t vmem_size, uint64_t shared_handle) { - shm_helper->set_gpu_info(vmem_size, shared_handle); +void ShmManager::set_gpu_info(int gpu_id, uint64_t vmem_size, + uint64_t shared_handle) { + shm_helper->set_gpu_info(gpu_id, vmem_size, shared_handle); + this->valid_gpu_ids.push_back(gpu_id); } - void ShmManager::run_busy_loop() { if (!cb_on_worker_change) { spdlog::error("cb_on_worker_change is not set"); @@ -155,14 +155,17 @@ void ShmManager::check_heart_beats() { shm_helper->heart_beats[i].timestamp.store(0, std::memory_order_release); // check dead lock - uint64_t gpu_flag = - shm_helper->gpu_flag.load(std::memory_order_acquire); - if (unpack_lock_field(gpu_flag) == 1 && - unpack_tgid_field(gpu_flag) == tgid) { - // release lock held by dead worker - spdlog::warn("Releasing GPU lock held by dead worker TGID {}", tgid); - shm_helper->gpu_flag.store(pack_unlocked_tgid(tgid), - std::memory_order_release); + for (int gpu_id : valid_gpu_ids) { + uint64_t gpu_flag = + shm_helper->gpu_flag[gpu_id].load(std::memory_order_acquire); + if (unpack_lock_field(gpu_flag) == 1 && + unpack_tgid_field(gpu_flag) == tgid) { + // release lock held by dead worker + spdlog::warn("Releasing GPU {} lock held by dead worker TGID {}", + gpu_id, tgid); + shm_helper->gpu_flag[gpu_id].store(pack_unlocked_tgid(tgid), + std::memory_order_release); + } } local_worker_tgids[i] = 0; alive_worker_tgids.erase(std::remove(alive_worker_tgids.begin(), diff --git a/csrc/idle_offload/shm_manager.h b/csrc/idle_offload/shm_manager.h index 48d1a3b..9293280 100644 --- a/csrc/idle_offload/shm_manager.h +++ b/csrc/idle_offload/shm_manager.h @@ -7,7 +7,7 @@ class ShmManager { ShmManager(); ~ShmManager(); - void set_gpu_info(uint64_t vmem_size, uint64_t shared_handle); + void set_gpu_info(int gpu_id, uint64_t vmem_size, uint64_t shared_handle); // request void process_requests(); @@ -29,6 +29,7 @@ class ShmManager { ShmHelper *shm_helper; std::vector local_worker_tgids; std::vector alive_worker_tgids; + std::vector valid_gpu_ids; std::atomic stop_loop_flag; std::function &)> cb_on_worker_change; }; diff --git a/csrc/idle_offload/shm_worker.cpp b/csrc/idle_offload/shm_worker.cpp index 8b22b3e..e1dbc25 100644 --- a/csrc/idle_offload/shm_worker.cpp +++ b/csrc/idle_offload/shm_worker.cpp @@ -1,6 +1,5 @@ #include "shm_worker.h" -// === ShmWorker === ShmWorker::ShmWorker() { std::string shm_name = get_shm_name(); @@ -29,16 +28,22 @@ ShmWorker::~ShmWorker() { munmap(shm_helper, SHM_SIZE); } -bool ShmWorker::register_worker(int32_t tgid, uint64_t *out_shareable_handle, +bool ShmWorker::register_worker(int32_t tgid, int gpu_id, + uint64_t *out_shareable_handle, uint64_t *out_vmem_size) { + if (gpu_id < 0 || gpu_id >= MAX_DEVICES) { + spdlog::error("Invalid GPU ID {}", gpu_id); + throw std::runtime_error("Invalid GPU ID"); + } this->tgid = tgid; + this->gpu_id = gpu_id; int slot = register_worker_shm(); if (slot == -1) { return false; } - *out_shareable_handle = shm_helper->shareable_handle; - *out_vmem_size = shm_helper->total_vmem_size; + *out_shareable_handle = shm_helper->vram_info[gpu_id].shareable_handle; + *out_vmem_size = shm_helper->vram_info[gpu_id].total_vmem_size; stop_heart_beat.store(false, std::memory_order_release); heart_beat_thread = std::thread(&ShmWorker::heart_beat_loop, this, slot); @@ -68,47 +73,62 @@ void ShmWorker::heart_beat_loop(int slot) { } } -bool ShmWorker::lock_gpu() { - int retry_cnt = 0; - uint64_t old_flag = shm_helper->gpu_flag.load(std::memory_order_acquire); +bool ShmWorker::try_lock_gpu(bool &out_self_hold) { + static int retry_cnt = 0; + + uint64_t old_flag = + shm_helper->gpu_flag[gpu_id].load(std::memory_order_acquire); + if (unpack_lock_field(old_flag) == 0) { // free + uint64_t new_flag = pack_locked_tgid(tgid); + if (shm_helper->gpu_flag[gpu_id].compare_exchange_weak( + old_flag, new_flag, std::memory_order_acq_rel, + std::memory_order_acquire)) { + spdlog::info("TGID {} acquired GPU {} lock", tgid, gpu_id); + int32_t prev_tgid = unpack_tgid_field(old_flag); + out_self_hold = prev_tgid == tgid; + retry_cnt = 0; + return true; + } + } else { // locked + if (unpack_tgid_field(old_flag) == tgid) { + spdlog::info("TGID {} already holds the GPU {} lock", tgid, gpu_id); + out_self_hold = true; + retry_cnt = 0; + return true; + } + } + // failed + if (++retry_cnt % 2000 == 0) { + spdlog::info( + "TGID {} trying to acquire GPU {} lock, current lock holder TGID {}", + tgid, gpu_id, unpack_tgid_field(old_flag)); + } + out_self_hold = false; + return false; +} + +bool ShmWorker::lock_gpu(bool &out_self_hold) { while (true) { - if (unpack_lock_field(old_flag) == 0) { - uint64_t new_flag = pack_locked_tgid(tgid); - if (shm_helper->gpu_flag.compare_exchange_weak(old_flag, new_flag, - std::memory_order_acq_rel, - std::memory_order_acquire)) { - spdlog::info("TGID {} acquired GPU lock", tgid); - int32_t old_tgid = unpack_tgid_field(old_flag); - return old_tgid == tgid; - } - } else { - if (unpack_tgid_field(old_flag) == tgid) { - spdlog::info("TGID {} already holds the GPU lock", tgid); - return true; - } + if (try_lock_gpu(out_self_hold)) { + return true; } // failed - ++retry_cnt; - if (retry_cnt % 1000 == 0) { - spdlog::info( - "TGID {} waiting for GPU lock, current lock holder TGID {}", tgid, - unpack_tgid_field(old_flag)); - } usleep(1000); - old_flag = shm_helper->gpu_flag.load(std::memory_order_acquire); } } void ShmWorker::unlock_gpu() { - uint64_t old_flag = shm_helper->gpu_flag.load(std::memory_order_acquire); + uint64_t old_flag = + shm_helper->gpu_flag[gpu_id].load(std::memory_order_acquire); if (unpack_tgid_field(old_flag) != tgid) { - spdlog::warn("previous gpu flag {} does not match expected locked flag for " - "TGID {}. This may be a bug, unless during startup.", - old_flag, tgid); + // spdlog::warn("previous gpu flag {} does not match expected locked flag for " + // "TGID {}. This may be a bug, unless during startup.", + // old_flag, tgid); + spdlog::info("TGID {} does not hold GPU {} lock", tgid, gpu_id); } else { uint64_t new_flag = pack_unlocked_tgid(tgid); - shm_helper->gpu_flag.store(new_flag, std::memory_order_release); - spdlog::info("TGID {} released GPU lock", tgid); + shm_helper->gpu_flag[gpu_id].store(new_flag, std::memory_order_release); + spdlog::info("TGID {} released GPU {} lock", tgid, gpu_id); } } @@ -142,7 +162,7 @@ uint64_t ShmWorker::make_request(uint32_t type, uint64_t parameter) { uint64_t response = shm_helper->request.response; // set ready to 0 shm_helper->req_ready.store(ShmHelper::READY_STATE_NO_REQUEST, - std::memory_order_release); + std::memory_order_release); return response; } diff --git a/csrc/idle_offload/shm_worker.h b/csrc/idle_offload/shm_worker.h index b005f0c..cbd4e4c 100644 --- a/csrc/idle_offload/shm_worker.h +++ b/csrc/idle_offload/shm_worker.h @@ -1,21 +1,27 @@ #pragma once +#include +#include +#include + #include "shm_helper.h" + class ShmWorker { public: ShmWorker(); ~ShmWorker(); - bool register_worker(int32_t tgid, uint64_t *out_shareable_handle, + bool register_worker(int32_t tgid, int gpu_id, uint64_t *out_shareable_handle, uint64_t *out_vmem_size); - bool lock_gpu(); + bool try_lock_gpu(bool &out_self_hold); + bool lock_gpu(bool &out_self_hold); void unlock_gpu(); - private: int32_t tgid; + int gpu_id; ShmHelper *shm_helper; std::thread heart_beat_thread; std::atomic stop_heart_beat; diff --git a/vllm_ascend/device_allocator/camem.py b/vllm_ascend/device_allocator/camem.py index bc30f59..f6f451b 100644 --- a/vllm_ascend/device_allocator/camem.py +++ b/vllm_ascend/device_allocator/camem.py @@ -63,14 +63,14 @@ try: init_module_offload as init_module, python_create_and_map_offload as python_create_and_map,python_unmap_and_release_offload as python_unmap_and_release, python_get_mem_info_offload as python_get_mem_info, - python_lock_gpu_offload as python_lock_gpu, + python_try_lock_gpu_offload as python_try_lock_gpu, python_unlock_gpu_offload as python_unlock_gpu ) else: from vllm_ascend.vllm_ascend_C import ( # type: ignore # noqa: F401 init_module, python_create_and_map, python_unmap_and_release) python_get_mem_info = None - python_lock_gpu = None + python_try_lock_gpu = None python_unlock_gpu = None lib_name = find_loaded_library("vllm_ascend_C") camem_available = True @@ -81,7 +81,7 @@ except ImportError as e: python_create_and_map = None python_unmap_and_release = None python_get_mem_info = None - python_lock_gpu = None + python_try_lock_gpu = None python_unlock_gpu = None lib_name = None libcudart = None @@ -109,12 +109,14 @@ def get_pluggable_allocator( python_malloc_fn: Callable[[tuple[int, int, int, int]], None], python_free_func: Callable[[int], tuple[int, int, int, int]] ) -> torch.npu.memory.NPUPluggableAllocator: - init_module(python_malloc_fn, python_free_func) if envs_ascend.VLLM_ASCEND_ENABLE_IDLE_OFFLOAD: + current_device = torch.npu.current_device() + init_module(python_malloc_fn, python_free_func, current_device) new_alloc = torch.npu.memory.NPUPluggableAllocator( lib_name, 'my_malloc_offload', 'my_free_offload' ) else: + init_module(python_malloc_fn, python_free_func) new_alloc = torch.npu.memory.NPUPluggableAllocator( lib_name, 'my_malloc', 'my_free' ) @@ -280,7 +282,7 @@ class CaMemAllocator: self.allocator_and_pools[tag] = data # lock gpu if envs_ascend.VLLM_ASCEND_ENABLE_IDLE_OFFLOAD: - self.vnpu_lock_gpu() + self._vnpu_lock_gpu() yield # PyTorch's bug, calling torch.cuda.empty_cache() will error # when using pluggable allocator, see @@ -306,12 +308,18 @@ class CaMemAllocator: sum_bytes += handle[1] return sum_bytes - - def vnpu_lock_gpu(self) -> bool: - if python_lock_gpu: - return python_lock_gpu() + def vnpu_try_lock_gpu(self) -> tuple[bool, bool]: + if python_try_lock_gpu: + return python_try_lock_gpu() else: - return False + return False, False + + def _vnpu_lock_gpu(self) -> bool: + while True: + success, _ = self.vnpu_try_lock_gpu() + if success: + return True + time.sleep(0.001) def vnpu_unlock_gpu(self): @@ -373,15 +381,15 @@ class CaMemAllocator: self.vnpu_unlock_gpu() # logger.info(f"offload: tags {offload_tags}: {sz_weights/(1024**3):.2f} GB, discard kv cache: {sz_kvcache/(1024**3):.2f} GB") - def reload_vram(self, tags: Optional[list[str]] = None) -> bool: - """ - Wake up the allocator from sleep mode. - All data that is previously offloaded will be loaded back to GPU - memory, and the rest of the data will have empty memory.""" - prev_is_self = self.vnpu_lock_gpu() + def try_reload_vram(self, tags: Optional[list[str]] = None) -> tuple[bool, bool]: + succ, prev_is_self = self.vnpu_try_lock_gpu() + if not succ: + # not get the lock + return False, prev_is_self + if prev_is_self: # nothing to do - return True + return succ, prev_is_self for ptr, data in self.pointer_to_data.items(): handle = data.handle @@ -401,4 +409,4 @@ class CaMemAllocator: # else: # size_in_bytes = handle[1] # memset(ptr, size_in_bytes, 0, size_in_bytes) - return False + return succ, prev_is_self diff --git a/vllm_ascend/patch/platform/patch_executor.py b/vllm_ascend/patch/platform/patch_executor.py index 2c558b4..e3a6a32 100644 --- a/vllm_ascend/patch/platform/patch_executor.py +++ b/vllm_ascend/patch/platform/patch_executor.py @@ -26,12 +26,21 @@ def reload_vram(self) -> bool: logger.warning("Executor is not offloaded.") return True - time_before_reload = time.perf_counter() - prev_is_self = self.collective_rpc("reload_vram") - time_after_reload = time.perf_counter() - self.is_offloaded = False - logger.info(f"Reloading VRAM costs {time_after_reload - time_before_reload:.6f} seconds.") - return all(prev_is_self) + while True: + time_before_reload = time.perf_counter() + res = self.collective_rpc("try_reload_vram") + time_after_reload = time.perf_counter() + + succ = all(x[0] for x in res) + if succ: + self.is_offloaded = False + logger.info(f"Reloading VRAM costs {time_after_reload - time_before_reload:.6f} seconds.") + prev_is_self = all(x[1] for x in res) + return prev_is_self + else: + # some workers not get lock + self.collective_rpc("vnpu_unlock_gpu") + time.sleep(0.001) def determine_available_memory_idle_offload_mode(self) -> int: diff --git a/vllm_ascend/worker/worker_v1.py b/vllm_ascend/worker/worker_v1.py index fdfc3b0..da6c7d1 100644 --- a/vllm_ascend/worker/worker_v1.py +++ b/vllm_ascend/worker/worker_v1.py @@ -339,12 +339,13 @@ class NPUWorker(WorkerBase): if envs_ascend.VLLM_ASCEND_ENABLE_IDLE_OFFLOAD: # save memory to host with lock self.offload_vram() - self.reload_vram() + succ, _ = self.try_reload_vram() + assert succ, "Failed to reload model weights after offloading." def offload_vram(self) -> None: # free_bytes_before_offload = NPUPlatform.mem_get_info()[0] allocator = CaMemAllocator.get_instance() - allocator.offload_vram(offload_tags=("weights", )) + allocator.offload_vram(offload_tags=("weights",)) # free_bytes_after_offload, total = NPUPlatform.mem_get_info() # freed_bytes = free_bytes_after_offload - free_bytes_before_offload # used_bytes = total - free_bytes_after_offload @@ -354,9 +355,13 @@ class NPUWorker(WorkerBase): # "%.2f GiB memory is still in use.", freed_bytes / GiB_bytes, # used_bytes / GiB_bytes) - def reload_vram(self) -> bool: + def try_reload_vram(self) -> tuple[bool, bool]: allocator = CaMemAllocator.get_instance() - return allocator.reload_vram(tags=None) + return allocator.try_reload_vram(tags=None) + + def vnpu_unlock_gpu(self) -> None: + allocator = CaMemAllocator.get_instance() + allocator.vnpu_unlock_gpu() def compile_or_warm_up_model(self) -> None: # Note: need to adapt for graph mode.