support multi npu partially

This commit is contained in:
starkwj
2026-01-08 06:54:33 +00:00
parent fa0fb46853
commit 2a571d8bc8
12 changed files with 331 additions and 160 deletions

View File

@@ -30,6 +30,5 @@ docker build -t vllm-ascend-multi-llm:latest -f ./Dockerfile .
## Limitations
- This project only support share a single NPU currently. This is also limited by the fact that HCCL cannot be shared.
- Restricted by the fact that HCCL cannot be shared, deploying more than one model with multi-GPU (e.g., TP) is not feasible currently.
- The prefix cache will be reset when the LLM is restored, since we just simply discard the KV cache when the LLM is offloaded.

View File

@@ -20,6 +20,7 @@
#include <atomic>
#include "idle_offload/shm_worker.h"
#include "idle_offload/npu_helper.h"
extern "C" {
@@ -474,8 +475,10 @@ static PyObject* python_create_and_map(PyObject* self, PyObject* args) {
static PyObject* py_init_module_offload(PyObject* self, PyObject* args) {
PyObject* malloc_callback = nullptr;
PyObject* free_callback = nullptr;
unsigned long long device = 0;
if (!PyArg_ParseTuple(args, "OO", &malloc_callback, &free_callback)) {
if (!PyArg_ParseTuple(args, "OOK", &malloc_callback, &free_callback,
&device)) {
return nullptr;
}
@@ -497,7 +500,13 @@ static PyObject* py_init_module_offload(PyObject* self, PyObject* args) {
}
g_initialized.store(true);
shm_worker = new ShmWorker();
std::vector<int> gpu_ids = get_npu_ids();
if (device >= gpu_ids.size()) {
throw std::runtime_error("Invalid device id: " + std::to_string(device) +
" " + __FILE__ + ":" + std::to_string(__LINE__));
}
int gpu_id = gpu_ids[device];
// get pid
aclError error_code;
int32_t pid;
@@ -508,11 +517,12 @@ static PyObject* py_init_module_offload(PyObject* self, PyObject* args) {
std::to_string(error_code) + " " + __FILE__ + ":" +
std::to_string(__LINE__));
}
shm_worker = new ShmWorker();
uint64_t shareable_handle;
shm_worker->register_worker(pid, &shareable_handle, &g_size);
shm_worker->register_worker(pid, gpu_id, &shareable_handle, &g_size);
// import shareable handle
uint32_t device = 0;
aclrtDrvMemHandle memHandle;
error_code =
aclrtMemImportFromShareableHandle(shareable_handle, device, &memHandle);
@@ -570,9 +580,16 @@ static PyObject* python_get_mem_info_offload(PyObject* self, PyObject* args) {
return tuple;
}
static PyObject* python_lock_gpu_offload(PyObject* self, PyObject* args) {
bool prev_is_self = shm_worker->lock_gpu();
return PyBool_FromLong(prev_is_self);
static PyObject* python_try_lock_gpu_offload(PyObject* self, PyObject* args) {
bool prev_is_self = false;
bool success = shm_worker->try_lock_gpu(prev_is_self);
PyObject* tuple = PyTuple_New(2);
if (!tuple) {
return nullptr;
}
PyTuple_SetItem(tuple, 0, PyBool_FromLong(success));
PyTuple_SetItem(tuple, 1, PyBool_FromLong(prev_is_self));
return tuple;
}
static PyObject* python_unlock_gpu_offload(PyObject* self, PyObject* args) {
@@ -597,7 +614,7 @@ static PyMethodDef module_methods[] = {
"Unmap and release memory on the device."},
{"python_get_mem_info_offload", (PyCFunction)python_get_mem_info_offload,
METH_NOARGS, "Get mem info in the reserved pool."},
{"python_lock_gpu_offload", (PyCFunction)python_lock_gpu_offload,
{"python_try_lock_gpu_offload", (PyCFunction)python_try_lock_gpu_offload,
METH_NOARGS, "Lock GPU."},
{"python_unlock_gpu_offload", (PyCFunction)python_unlock_gpu_offload,
METH_NOARGS, "Unlock GPU."},

View File

@@ -0,0 +1,86 @@
#include <vector>
#include <string>
#include <stdint.h>
#include <filesystem>
#include <algorithm>
#include "spdlog/spdlog.h"
#include "acl/acl.h"
std::vector<int> get_available_devices() {
namespace fs = std::filesystem;
std::vector<int> devices;
const std::string dev_path = "/dev";
const std::string prefix = "davinci";
if (!fs::exists(dev_path)) {
return devices;
}
try {
for (const auto &entry : fs::directory_iterator(dev_path)) {
std::string filename = entry.path().filename().string();
if (filename.rfind(prefix, 0) == 0) {
std::string suffix = filename.substr(prefix.length());
// filter not digit suffix
if (!suffix.empty() &&
std::all_of(suffix.begin(), suffix.end(),
[](unsigned char c) { return std::isdigit(c); })) {
try {
int id = std::stoi(suffix);
devices.push_back(id);
} catch (...) {
}
}
}
}
} catch (const fs::filesystem_error &e) {
spdlog::error("Error accessing /dev: {}", e.what());
}
std::sort(devices.begin(), devices.end());
return devices;
}
std::vector<int> get_npu_ids() {
std::vector<int> available_devices = get_available_devices();
std::vector<int> npu_ids;
uint32_t device_count = 0;
aclError error_code = aclrtGetDeviceCount(&device_count);
if (error_code != 0) {
spdlog::error("Failed to get NPU device count, error code: {}", error_code);
throw std::runtime_error("Failed to get NPU device count");
}
if (device_count > available_devices.size()) {
spdlog::error("The number of available NPU devices ({}) is less than the "
"number of devices reported by ACL ({}).",
available_devices.size(), device_count);
throw std::runtime_error("Inconsistent NPU device count");
}
const char *env_available_npu = getenv("ASCEND_RT_VISIBLE_DEVICES");
if (env_available_npu) {
std::string npu_str(env_available_npu);
size_t start = 0;
while (start < npu_str.size()) {
size_t next = npu_str.find(',', start);
if (next == std::string::npos) {
next = npu_str.size();
}
int device_id = std::stoi(npu_str.substr(start, next - start));
npu_ids.push_back(available_devices[device_id]);
start = next + 1;
if (npu_ids.size() >= device_count) {
break;
}
}
} else {
npu_ids.insert(npu_ids.end(), available_devices.begin(),
available_devices.begin() + device_count);
}
return npu_ids;
}

View File

@@ -13,6 +13,7 @@
#include "acl/acl.h"
#include "shm_manager.h"
#include "npu_helper.h"
#include "spdlog/spdlog.h"
@@ -49,8 +50,6 @@ void ensure_context(unsigned long long device) {
void init_acl() {
int32_t deviceId=0;
// aclrtStream stream;
bool g_isDevice;
aclError ret = aclrtSetDevice(deviceId);
if (ret != ACL_ERROR_NONE) {
@@ -59,7 +58,8 @@ void init_acl() {
}
}
void reset_pids(const std::vector<int32_t> &pids, uint64_t shareable_handle) {
void reset_pids(const std::vector<int32_t> &pids,
const std::vector<uint64_t> &shareable_handles) {
int cnt = pids.size();
if (cnt <= 0) {
return;
@@ -68,21 +68,21 @@ void reset_pids(const std::vector<int32_t> &pids, uint64_t shareable_handle) {
int32_t pids_data[cnt];
memcpy(pids_data, pids.data(), cnt * sizeof(int32_t));
aclError error_code =
aclrtMemSetPidToShareableHandle(shareable_handle, pids_data, cnt);
if (error_code != 0) {
spdlog::error("aclrtMemSetPidToShareableHandle failed, error_code: {}",
error_code);
throw std::runtime_error("aclrtMemSetPidToShareableHandle failed");
} else {
spdlog::info("aclrtMemSetPidToShareableHandle succeeded, num_pids: {}",
cnt);
for (int i = 0; i < shareable_handles.size(); ++i) {
uint64_t shareable_handle = shareable_handles[i];
aclError error_code =
aclrtMemSetPidToShareableHandle(shareable_handle, pids_data, cnt);
if (error_code != 0) {
spdlog::error("aclrtMemSetPidToShareableHandle failed, error_code: {}",
error_code);
throw std::runtime_error("aclrtMemSetPidToShareableHandle failed");
}
}
spdlog::info("aclrtMemSetPidToShareableHandle succeeded, num_pids: {}", cnt);
}
void start_daemon() {
init_acl();
void alloc_physical(uint32_t device_id, aclrtDrvMemHandle &out_mem_handle,
size_t &out_g_size) {
aclError error_code;
size_t free_mem = 0, total = 0;
error_code = aclrtGetMemInfo(ACL_HBM_MEM, &free_mem, &total);
@@ -94,12 +94,11 @@ void start_daemon() {
total);
}
uint32_t device = 0;
aclrtPhysicalMemProp prop = {};
prop.handleType = ACL_MEM_HANDLE_TYPE_NONE;
prop.allocationType = ACL_MEM_ALLOCATION_TYPE_PINNED;
prop.memAttr = ACL_HBM_MEM_HUGE;
prop.location.id = device;
prop.location.id = device_id;
prop.location.type = ACL_MEM_LOCATION_TYPE_DEVICE;
prop.reserve = 0;
@@ -107,7 +106,8 @@ void start_daemon() {
error_code = aclrtMemGetAllocationGranularity(
&prop, ACL_RT_MEM_ALLOC_GRANULARITY_MINIMUM, &granularity);
if (error_code != 0) {
spdlog::error("aclrtMemGetAllocationGranularity failed, error_code: {}", error_code);
spdlog::error("aclrtMemGetAllocationGranularity failed, error_code: {}",
error_code);
throw std::runtime_error("aclrtMemGetAllocationGranularity failed");
} else {
spdlog::info("aclrtMemGetAllocationGranularity succeeded, granularity: {}",
@@ -118,59 +118,68 @@ void start_daemon() {
reserved_mem_size, free_mem);
throw std::runtime_error("Not enough free memory to reserve");
}
size_t g_size = free_mem - reserved_mem_size;
g_size = (g_size / granularity) * granularity;
out_g_size = free_mem - reserved_mem_size;
out_g_size = (out_g_size / granularity) * granularity;
// allocate physical memory
aclrtDrvMemHandle mem_handle;
error_code = aclrtMallocPhysical(&mem_handle, g_size, &prop, 0);
error_code = aclrtMallocPhysical(&out_mem_handle, out_g_size, &prop, 0);
if (error_code != 0) {
spdlog::error("aclrtMallocPhysical failed, error_code: {}", error_code);
throw std::runtime_error("aclrtMallocPhysical failed");
} else {
spdlog::info("aclrtMallocPhysical succeeded, size: {}", g_size);
spdlog::info("device {} aclrtMallocPhysical succeeded, size: {}", device_id,
out_g_size);
}
}
// // reserve address
// void *vmem_addr = nullptr;
// error_code = aclrtReserveMemAddress(&vmem_addr, g_size, 0, nullptr, 0);
// if (error_code != 0) {
// spdlog::error("aclrtReserveMemAddress failed, error_code: {}", error_code);
// throw std::runtime_error("aclrtReserveMemAddress failed");
// } else {
// spdlog::info("aclrtReserveMemAddress succeeded, vmem_addr: {}", vmem_addr);
// }
// // map
// error_code = aclrtMapMem(vmem_addr, g_size, 0, mem_handle, 0);
// if (error_code != 0) {
// spdlog::error("aclrtMapMem failed, error_code: {}", error_code);
// throw std::runtime_error("aclrtMapMem failed");
// } else {
// spdlog::info("aclrtMapMem succeeded, vmem_addr: {}", vmem_addr);
// }
// export
uint64_t shareable_handle;
error_code = aclrtMemExportToShareableHandle(
mem_handle, ACL_MEM_HANDLE_TYPE_NONE, ACL_RT_VMM_EXPORT_FLAG_DEFAULT,
&shareable_handle);
if (error_code != 0) {
spdlog::error("aclrtMemExportToShareableHandle failed, error_code: {}",
error_code);
throw std::runtime_error("aclrtMemExportToShareableHandle failed");
} else {
spdlog::info(
"aclrtMemExportToShareableHandle succeeded, shareable_handle: {}",
shareable_handle);
}
void start_daemon() {
init_acl();
std::vector<int> npu_ids = get_npu_ids();
std::vector<aclrtDrvMemHandle> mem_handles;
std::vector<uint64_t> shareable_handles;
// shm
shm_manager = new ShmManager();
shm_manager->set_gpu_info(g_size, shareable_handle);
for (int i = 0; i < npu_ids.size(); ++i) {
uint32_t device_id = i;
int npu_id = npu_ids[i];
spdlog::info("Setting up device id {} - npu id {}", device_id, npu_id);
aclError error_code = aclrtSetDevice(device_id);
if (error_code != ACL_ERROR_NONE) {
throw std::runtime_error("aclrtSetDevice failed with acl error code: " +
std::to_string(error_code) + " " + __FILE__ +
":" + std::to_string(__LINE__));
}
// alloc physical
aclrtDrvMemHandle mem_handle;
size_t g_size;
alloc_physical(device_id, mem_handle, g_size);
mem_handles.push_back(mem_handle);
// export
uint64_t shareable_handle;
error_code = aclrtMemExportToShareableHandle(
mem_handle, ACL_MEM_HANDLE_TYPE_NONE, ACL_RT_VMM_EXPORT_FLAG_DEFAULT,
&shareable_handle);
if (error_code != 0) {
spdlog::error("aclrtMemExportToShareableHandle failed, error_code: {}",
error_code);
throw std::runtime_error("aclrtMemExportToShareableHandle failed");
} else {
spdlog::info(
"aclrtMemExportToShareableHandle succeeded, shareable_handle: {}",
shareable_handle);
}
shm_manager->set_gpu_info(npu_id, g_size, shareable_handle);
shareable_handles.push_back(shareable_handle);
}
shm_manager->register_callback_on_worker_change(
[&](const std::vector<int32_t> &pids) {
reset_pids(pids, shareable_handle);
reset_pids(pids, shareable_handles);
});
// start busy loop
@@ -181,10 +190,12 @@ void start_daemon() {
shm_manager = nullptr;
// free physical memory
error_code = aclrtFreePhysical(mem_handle);
if (error_code != 0) {
spdlog::error("aclrtFreePhysical failed, error_code: {}", error_code);
throw std::runtime_error("aclrtFreePhysical failed");
for (auto mem_handle : mem_handles) {
aclError error_code = aclrtFreePhysical(mem_handle);
if (error_code != 0) {
spdlog::error("aclrtFreePhysical failed, error_code: {}", error_code);
throw std::runtime_error("aclrtFreePhysical failed");
}
}
}

View File

@@ -19,6 +19,7 @@
#define MAX_WORKERS 60
#define MAX_DEVICES 16
// static constexpr const char *SHM_NAME = "/vllm_acl_vnpu_offload_shm";
static inline std::string get_shm_name() {
const char *env_shm_name = getenv("VLLM_IDLE_OFFLOAD_SHM_NAME");
@@ -69,11 +70,14 @@ static inline uint64_t pack_unlocked_tgid(int32_t tgid) {
// mmap usually page-aligned
struct alignas(64) ShmHelper {
struct VramInfo {
uint64_t total_vmem_size;
uint64_t shareable_handle;
};
VramInfo vram_info[MAX_DEVICES]; // support max 16 NPUs
// GPU lock flag
std::atomic<uint64_t> gpu_flag;
uint64_t total_vmem_size;
uint64_t shareable_handle;
uint8_t _padding[64 - sizeof(std::atomic<uint64_t>) - 2 * sizeof(uint64_t)];
std::atomic<uint64_t> gpu_flag[MAX_DEVICES];
// uint8_t _padding1[64 - sizeof(std::atomic<uint64_t>)];
// request
enum RequestType: uint32_t {
@@ -105,14 +109,16 @@ struct alignas(64) ShmHelper {
WorkerHeartBeat heart_beats[MAX_WORKERS];
void init() {
gpu_flag.store(0, std::memory_order_release);
memset(vram_info, 0, sizeof(vram_info));
for (size_t i = 0; i < MAX_DEVICES; ++i) {
gpu_flag[i].store(0, std::memory_order_release);
}
req_ready.store(READY_STATE_NO_REQUEST, std::memory_order_release);
}
void set_gpu_info(uint64_t vmem_size, uint64_t shared_handle) {
total_vmem_size = vmem_size;
shareable_handle = shared_handle;
init();
void set_gpu_info(int gpu_id, uint64_t vmem_size, uint64_t shared_handle) {
vram_info[gpu_id].total_vmem_size = vmem_size;
vram_info[gpu_id].shareable_handle = shared_handle;
}
};

View File

@@ -1,7 +1,6 @@
#include "shm_manager.h"
#include <algorithm>
// === ShmManager ===
ShmManager::ShmManager() {
std::string shm_name = get_shm_name();
@@ -37,11 +36,12 @@ ShmManager::~ShmManager() {
shm_unlink(shm_name.c_str());
}
void ShmManager::set_gpu_info(uint64_t vmem_size, uint64_t shared_handle) {
shm_helper->set_gpu_info(vmem_size, shared_handle);
void ShmManager::set_gpu_info(int gpu_id, uint64_t vmem_size,
uint64_t shared_handle) {
shm_helper->set_gpu_info(gpu_id, vmem_size, shared_handle);
this->valid_gpu_ids.push_back(gpu_id);
}
void ShmManager::run_busy_loop() {
if (!cb_on_worker_change) {
spdlog::error("cb_on_worker_change is not set");
@@ -155,14 +155,17 @@ void ShmManager::check_heart_beats() {
shm_helper->heart_beats[i].timestamp.store(0,
std::memory_order_release);
// check dead lock
uint64_t gpu_flag =
shm_helper->gpu_flag.load(std::memory_order_acquire);
if (unpack_lock_field(gpu_flag) == 1 &&
unpack_tgid_field(gpu_flag) == tgid) {
// release lock held by dead worker
spdlog::warn("Releasing GPU lock held by dead worker TGID {}", tgid);
shm_helper->gpu_flag.store(pack_unlocked_tgid(tgid),
std::memory_order_release);
for (int gpu_id : valid_gpu_ids) {
uint64_t gpu_flag =
shm_helper->gpu_flag[gpu_id].load(std::memory_order_acquire);
if (unpack_lock_field(gpu_flag) == 1 &&
unpack_tgid_field(gpu_flag) == tgid) {
// release lock held by dead worker
spdlog::warn("Releasing GPU {} lock held by dead worker TGID {}",
gpu_id, tgid);
shm_helper->gpu_flag[gpu_id].store(pack_unlocked_tgid(tgid),
std::memory_order_release);
}
}
local_worker_tgids[i] = 0;
alive_worker_tgids.erase(std::remove(alive_worker_tgids.begin(),

View File

@@ -7,7 +7,7 @@ class ShmManager {
ShmManager();
~ShmManager();
void set_gpu_info(uint64_t vmem_size, uint64_t shared_handle);
void set_gpu_info(int gpu_id, uint64_t vmem_size, uint64_t shared_handle);
// request
void process_requests();
@@ -29,6 +29,7 @@ class ShmManager {
ShmHelper *shm_helper;
std::vector<int32_t> local_worker_tgids;
std::vector<int32_t> alive_worker_tgids;
std::vector<int> valid_gpu_ids;
std::atomic<bool> stop_loop_flag;
std::function<void(const std::vector<int32_t> &)> cb_on_worker_change;
};

View File

@@ -1,6 +1,5 @@
#include "shm_worker.h"
// === ShmWorker ===
ShmWorker::ShmWorker() {
std::string shm_name = get_shm_name();
@@ -29,16 +28,22 @@ ShmWorker::~ShmWorker() {
munmap(shm_helper, SHM_SIZE);
}
bool ShmWorker::register_worker(int32_t tgid, uint64_t *out_shareable_handle,
bool ShmWorker::register_worker(int32_t tgid, int gpu_id,
uint64_t *out_shareable_handle,
uint64_t *out_vmem_size) {
if (gpu_id < 0 || gpu_id >= MAX_DEVICES) {
spdlog::error("Invalid GPU ID {}", gpu_id);
throw std::runtime_error("Invalid GPU ID");
}
this->tgid = tgid;
this->gpu_id = gpu_id;
int slot = register_worker_shm();
if (slot == -1) {
return false;
}
*out_shareable_handle = shm_helper->shareable_handle;
*out_vmem_size = shm_helper->total_vmem_size;
*out_shareable_handle = shm_helper->vram_info[gpu_id].shareable_handle;
*out_vmem_size = shm_helper->vram_info[gpu_id].total_vmem_size;
stop_heart_beat.store(false, std::memory_order_release);
heart_beat_thread = std::thread(&ShmWorker::heart_beat_loop, this, slot);
@@ -68,47 +73,62 @@ void ShmWorker::heart_beat_loop(int slot) {
}
}
bool ShmWorker::lock_gpu() {
int retry_cnt = 0;
uint64_t old_flag = shm_helper->gpu_flag.load(std::memory_order_acquire);
bool ShmWorker::try_lock_gpu(bool &out_self_hold) {
static int retry_cnt = 0;
uint64_t old_flag =
shm_helper->gpu_flag[gpu_id].load(std::memory_order_acquire);
if (unpack_lock_field(old_flag) == 0) { // free
uint64_t new_flag = pack_locked_tgid(tgid);
if (shm_helper->gpu_flag[gpu_id].compare_exchange_weak(
old_flag, new_flag, std::memory_order_acq_rel,
std::memory_order_acquire)) {
spdlog::info("TGID {} acquired GPU {} lock", tgid, gpu_id);
int32_t prev_tgid = unpack_tgid_field(old_flag);
out_self_hold = prev_tgid == tgid;
retry_cnt = 0;
return true;
}
} else { // locked
if (unpack_tgid_field(old_flag) == tgid) {
spdlog::info("TGID {} already holds the GPU {} lock", tgid, gpu_id);
out_self_hold = true;
retry_cnt = 0;
return true;
}
}
// failed
if (++retry_cnt % 2000 == 0) {
spdlog::info(
"TGID {} trying to acquire GPU {} lock, current lock holder TGID {}",
tgid, gpu_id, unpack_tgid_field(old_flag));
}
out_self_hold = false;
return false;
}
bool ShmWorker::lock_gpu(bool &out_self_hold) {
while (true) {
if (unpack_lock_field(old_flag) == 0) {
uint64_t new_flag = pack_locked_tgid(tgid);
if (shm_helper->gpu_flag.compare_exchange_weak(old_flag, new_flag,
std::memory_order_acq_rel,
std::memory_order_acquire)) {
spdlog::info("TGID {} acquired GPU lock", tgid);
int32_t old_tgid = unpack_tgid_field(old_flag);
return old_tgid == tgid;
}
} else {
if (unpack_tgid_field(old_flag) == tgid) {
spdlog::info("TGID {} already holds the GPU lock", tgid);
return true;
}
if (try_lock_gpu(out_self_hold)) {
return true;
}
// failed
++retry_cnt;
if (retry_cnt % 1000 == 0) {
spdlog::info(
"TGID {} waiting for GPU lock, current lock holder TGID {}", tgid,
unpack_tgid_field(old_flag));
}
usleep(1000);
old_flag = shm_helper->gpu_flag.load(std::memory_order_acquire);
}
}
void ShmWorker::unlock_gpu() {
uint64_t old_flag = shm_helper->gpu_flag.load(std::memory_order_acquire);
uint64_t old_flag =
shm_helper->gpu_flag[gpu_id].load(std::memory_order_acquire);
if (unpack_tgid_field(old_flag) != tgid) {
spdlog::warn("previous gpu flag {} does not match expected locked flag for "
"TGID {}. This may be a bug, unless during startup.",
old_flag, tgid);
// spdlog::warn("previous gpu flag {} does not match expected locked flag for "
// "TGID {}. This may be a bug, unless during startup.",
// old_flag, tgid);
spdlog::info("TGID {} does not hold GPU {} lock", tgid, gpu_id);
} else {
uint64_t new_flag = pack_unlocked_tgid(tgid);
shm_helper->gpu_flag.store(new_flag, std::memory_order_release);
spdlog::info("TGID {} released GPU lock", tgid);
shm_helper->gpu_flag[gpu_id].store(new_flag, std::memory_order_release);
spdlog::info("TGID {} released GPU {} lock", tgid, gpu_id);
}
}
@@ -142,7 +162,7 @@ uint64_t ShmWorker::make_request(uint32_t type, uint64_t parameter) {
uint64_t response = shm_helper->request.response;
// set ready to 0
shm_helper->req_ready.store(ShmHelper::READY_STATE_NO_REQUEST,
std::memory_order_release);
std::memory_order_release);
return response;
}

View File

@@ -1,21 +1,27 @@
#pragma once
#include <vector>
#include <atomic>
#include <thread>
#include "shm_helper.h"
class ShmWorker {
public:
ShmWorker();
~ShmWorker();
bool register_worker(int32_t tgid, uint64_t *out_shareable_handle,
bool register_worker(int32_t tgid, int gpu_id, uint64_t *out_shareable_handle,
uint64_t *out_vmem_size);
bool lock_gpu();
bool try_lock_gpu(bool &out_self_hold);
bool lock_gpu(bool &out_self_hold);
void unlock_gpu();
private:
int32_t tgid;
int gpu_id;
ShmHelper *shm_helper;
std::thread heart_beat_thread;
std::atomic<bool> stop_heart_beat;

View File

@@ -63,14 +63,14 @@ try:
init_module_offload as init_module,
python_create_and_map_offload as python_create_and_map,python_unmap_and_release_offload as python_unmap_and_release,
python_get_mem_info_offload as python_get_mem_info,
python_lock_gpu_offload as python_lock_gpu,
python_try_lock_gpu_offload as python_try_lock_gpu,
python_unlock_gpu_offload as python_unlock_gpu
)
else:
from vllm_ascend.vllm_ascend_C import ( # type: ignore # noqa: F401
init_module, python_create_and_map, python_unmap_and_release)
python_get_mem_info = None
python_lock_gpu = None
python_try_lock_gpu = None
python_unlock_gpu = None
lib_name = find_loaded_library("vllm_ascend_C")
camem_available = True
@@ -81,7 +81,7 @@ except ImportError as e:
python_create_and_map = None
python_unmap_and_release = None
python_get_mem_info = None
python_lock_gpu = None
python_try_lock_gpu = None
python_unlock_gpu = None
lib_name = None
libcudart = None
@@ -109,12 +109,14 @@ def get_pluggable_allocator(
python_malloc_fn: Callable[[tuple[int, int, int, int]], None],
python_free_func: Callable[[int], tuple[int, int, int, int]]
) -> torch.npu.memory.NPUPluggableAllocator:
init_module(python_malloc_fn, python_free_func)
if envs_ascend.VLLM_ASCEND_ENABLE_IDLE_OFFLOAD:
current_device = torch.npu.current_device()
init_module(python_malloc_fn, python_free_func, current_device)
new_alloc = torch.npu.memory.NPUPluggableAllocator(
lib_name, 'my_malloc_offload', 'my_free_offload'
)
else:
init_module(python_malloc_fn, python_free_func)
new_alloc = torch.npu.memory.NPUPluggableAllocator(
lib_name, 'my_malloc', 'my_free'
)
@@ -280,7 +282,7 @@ class CaMemAllocator:
self.allocator_and_pools[tag] = data
# lock gpu
if envs_ascend.VLLM_ASCEND_ENABLE_IDLE_OFFLOAD:
self.vnpu_lock_gpu()
self._vnpu_lock_gpu()
yield
# PyTorch's bug, calling torch.cuda.empty_cache() will error
# when using pluggable allocator, see
@@ -306,12 +308,18 @@ class CaMemAllocator:
sum_bytes += handle[1]
return sum_bytes
def vnpu_lock_gpu(self) -> bool:
if python_lock_gpu:
return python_lock_gpu()
def vnpu_try_lock_gpu(self) -> tuple[bool, bool]:
if python_try_lock_gpu:
return python_try_lock_gpu()
else:
return False
return False, False
def _vnpu_lock_gpu(self) -> bool:
while True:
success, _ = self.vnpu_try_lock_gpu()
if success:
return True
time.sleep(0.001)
def vnpu_unlock_gpu(self):
@@ -373,15 +381,15 @@ class CaMemAllocator:
self.vnpu_unlock_gpu()
# logger.info(f"offload: tags {offload_tags}: {sz_weights/(1024**3):.2f} GB, discard kv cache: {sz_kvcache/(1024**3):.2f} GB")
def reload_vram(self, tags: Optional[list[str]] = None) -> bool:
"""
Wake up the allocator from sleep mode.
All data that is previously offloaded will be loaded back to GPU
memory, and the rest of the data will have empty memory."""
prev_is_self = self.vnpu_lock_gpu()
def try_reload_vram(self, tags: Optional[list[str]] = None) -> tuple[bool, bool]:
succ, prev_is_self = self.vnpu_try_lock_gpu()
if not succ:
# not get the lock
return False, prev_is_self
if prev_is_self:
# nothing to do
return True
return succ, prev_is_self
for ptr, data in self.pointer_to_data.items():
handle = data.handle
@@ -401,4 +409,4 @@ class CaMemAllocator:
# else:
# size_in_bytes = handle[1]
# memset(ptr, size_in_bytes, 0, size_in_bytes)
return False
return succ, prev_is_self

View File

@@ -26,12 +26,21 @@ def reload_vram(self) -> bool:
logger.warning("Executor is not offloaded.")
return True
time_before_reload = time.perf_counter()
prev_is_self = self.collective_rpc("reload_vram")
time_after_reload = time.perf_counter()
self.is_offloaded = False
logger.info(f"Reloading VRAM costs {time_after_reload - time_before_reload:.6f} seconds.")
return all(prev_is_self)
while True:
time_before_reload = time.perf_counter()
res = self.collective_rpc("try_reload_vram")
time_after_reload = time.perf_counter()
succ = all(x[0] for x in res)
if succ:
self.is_offloaded = False
logger.info(f"Reloading VRAM costs {time_after_reload - time_before_reload:.6f} seconds.")
prev_is_self = all(x[1] for x in res)
return prev_is_self
else:
# some workers not get lock
self.collective_rpc("vnpu_unlock_gpu")
time.sleep(0.001)
def determine_available_memory_idle_offload_mode(self) -> int:

View File

@@ -339,12 +339,13 @@ class NPUWorker(WorkerBase):
if envs_ascend.VLLM_ASCEND_ENABLE_IDLE_OFFLOAD:
# save memory to host with lock
self.offload_vram()
self.reload_vram()
succ, _ = self.try_reload_vram()
assert succ, "Failed to reload model weights after offloading."
def offload_vram(self) -> None:
# free_bytes_before_offload = NPUPlatform.mem_get_info()[0]
allocator = CaMemAllocator.get_instance()
allocator.offload_vram(offload_tags=("weights", ))
allocator.offload_vram(offload_tags=("weights",))
# free_bytes_after_offload, total = NPUPlatform.mem_get_info()
# freed_bytes = free_bytes_after_offload - free_bytes_before_offload
# used_bytes = total - free_bytes_after_offload
@@ -354,9 +355,13 @@ class NPUWorker(WorkerBase):
# "%.2f GiB memory is still in use.", freed_bytes / GiB_bytes,
# used_bytes / GiB_bytes)
def reload_vram(self) -> bool:
def try_reload_vram(self) -> tuple[bool, bool]:
allocator = CaMemAllocator.get_instance()
return allocator.reload_vram(tags=None)
return allocator.try_reload_vram(tags=None)
def vnpu_unlock_gpu(self) -> None:
allocator = CaMemAllocator.get_instance()
allocator.vnpu_unlock_gpu()
def compile_or_warm_up_model(self) -> None:
# Note: need to adapt for graph mode.