#include "shm_worker.h" static inline uint16_t get_shm_priority() { const char *env_priority = getenv("VLLM_VNPU_PRIORITY"); if (env_priority) { try { int p = std::stoi(env_priority); if (p >= 0 && p <= 7) { return static_cast(p); } else { spdlog::warn("VLLM_VNPU_PRIORITY should be between 0 and 7, got {}. Using default 0.", p); } } catch (...) { spdlog::warn("Invalid VLLM_VNPU_PRIORITY format. Using default 0."); } } return 0; } ShmWorker::ShmWorker() { this->priority = get_shm_priority(); this->waiting_timestamp = 0; this->is_waiting = false; this->is_holding_lock = false; spdlog::info("vNPU worker initialized with priority {}", priority); std::string shm_name = get_shm_name(); int shm_fd = shm_open(shm_name.c_str(), O_RDWR, 0666); if (shm_fd == -1) { spdlog::error("Failed to open shared memory segment. Maybe the daemon is " "not started."); throw std::runtime_error("Failed to open shared memory segment"); } void *ptr = mmap(nullptr, SHM_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0); if (ptr == MAP_FAILED) { spdlog::error("Failed to map shared memory segment"); throw std::runtime_error("Failed to map shared memory segment"); } close(shm_fd); shm_helper = static_cast(ptr); } ShmWorker::~ShmWorker() { stop_heart_beat.store(true, std::memory_order_release); heart_beat_thread.join(); munmap(shm_helper, SHM_SIZE); } bool ShmWorker::register_worker(int32_t tgid, int gpu_id, uint64_t *out_shareable_handle, uint64_t *out_vmem_size) { if (gpu_id < 0 || gpu_id >= MAX_DEVICES) { spdlog::error("Invalid GPU ID {}", gpu_id); throw std::runtime_error("Invalid GPU ID"); } this->tgid = tgid; this->gpu_id = gpu_id; int slot = register_worker_shm(); if (slot == -1) { return false; } this->shm_slot = slot; *out_shareable_handle = shm_helper->vram_info[gpu_id].shareable_handle; *out_vmem_size = shm_helper->vram_info[gpu_id].total_vmem_size; stop_heart_beat.store(false, std::memory_order_release); heart_beat_thread = std::thread(&ShmWorker::heart_beat_loop, this); return true; } void ShmWorker::heart_beat_loop() { int slot = this->shm_slot; while (!stop_heart_beat.load(std::memory_order_acquire)) { // update heart beat int32_t shm_tgid = shm_helper->heart_beats[slot].tgid.load(std::memory_order_acquire); if (shm_tgid != tgid) { spdlog::error( "Maybe bug: Heart beat slot {} TGID mismatch (local: {}, shm: {})", slot, tgid, shm_tgid); // re-register slot = register_worker_shm(); if (slot == -1) { spdlog::error("TGID {} failed to re-register as worker", tgid); throw std::runtime_error("Failed to re-register as worker"); } this->shm_slot = slot; } uint64_t now = heartbeat_ts_us(); shm_helper->heart_beats[slot].timestamp.store(now, std::memory_order_release); usleep(heartbeat_us); } } void ShmWorker::start_wait() { if (is_waiting) return; // Keep the older timestamp if already waiting // Use lower 24 bits of millisecond timestamp waiting_timestamp = static_cast((heartbeat_ts_us() / 1000) & 0xFFFFFF); uint64_t flag = pack_waiting_flag(this->gpu_id, this->priority, waiting_timestamp, this->tgid); shm_helper->waiting_worker_flags[this->shm_slot].store(flag, std::memory_order_release); is_waiting = true; } void ShmWorker::cancel_wait() { if (!is_waiting) return; shm_helper->waiting_worker_flags[this->shm_slot].store(0, std::memory_order_release); is_waiting = false; } bool ShmWorker::has_higher_priority_waiter() { for (int i = 0; i < MAX_WORKERS; ++i) { if (i == this->shm_slot) continue; uint64_t flag = shm_helper->waiting_worker_flags[i].load(std::memory_order_acquire); if (flag == 0) continue; if (unpack_waiting_device_id(flag) != this->gpu_id) continue; uint16_t other_prio = unpack_waiting_priority(flag); if (other_prio > this->priority) { return true; // Found a waiter with higher priority } else if (other_prio == this->priority) { if (this->is_holding_lock) { // doesn't need to yield to same priority waiters continue; } if (!this->is_waiting) { // an earlier waiter with the same priority return true; } uint32_t other_ts = unpack_waiting_timestamp_ms(flag); // Same priority, compare timestamps (handle 24-bit wrap-around) // Using 24-bit unsigned subtraction. If the difference is in the lower half, // my timestamp is greater (i.e., I started waiting later). uint32_t diff = (this->waiting_timestamp - other_ts) & 0xFFFFFF; if (diff > 0 && diff < 0x800000) { return true; // The other worker started waiting earlier } else if (diff == 0 && unpack_waiting_tgid(flag) < this->tgid) { // using tgid if timestamps happen to be exactly the same return true; } } } return false; } bool ShmWorker::try_lock_gpu(bool &out_self_hold) { static int retry_cnt = 0; uint64_t old_flag = shm_helper->gpu_flag[gpu_id].load(std::memory_order_acquire); if (unpack_lock_field(old_flag) == 0) { // free // Check priority: yield if there are higher priority waiters, or same priority waiters who have waited longer. if (has_higher_priority_waiter()) { out_self_hold = false; return false; } uint64_t new_flag = pack_locked_tgid(tgid); if (shm_helper->gpu_flag[gpu_id].compare_exchange_weak( old_flag, new_flag, std::memory_order_acq_rel, std::memory_order_acquire)) { // spdlog::info("TGID {} acquired GPU {} lock", tgid, gpu_id); int32_t prev_tgid = unpack_tgid_field(old_flag); out_self_hold = prev_tgid == tgid; retry_cnt = 0; this->is_holding_lock = true; return true; } } else { // locked if (unpack_tgid_field(old_flag) == tgid) { // spdlog::info("TGID {} already holds the GPU {} lock", tgid, gpu_id); out_self_hold = true; retry_cnt = 0; this->is_holding_lock = true; return true; } } // failed if (++retry_cnt % 10000 == 0) { spdlog::info( "TGID {} trying to acquire GPU {} lock, current lock holder TGID {}", tgid, gpu_id, unpack_tgid_field(old_flag)); } out_self_hold = false; return false; } bool ShmWorker::lock_gpu(bool &out_self_hold) { while (true) { if (try_lock_gpu(out_self_hold)) { return true; } // failed usleep(1000); } } void ShmWorker::unlock_gpu(bool keep_wait) { if (!keep_wait) { cancel_wait(); } uint64_t old_flag = shm_helper->gpu_flag[gpu_id].load(std::memory_order_acquire); if (unpack_tgid_field(old_flag) != tgid) { if (!keep_wait) { spdlog::info("unlock: TGID {} does not hold GPU {} lock", tgid, gpu_id); } } else { uint64_t new_flag = pack_unlocked_tgid(tgid); shm_helper->gpu_flag[gpu_id].store(new_flag, std::memory_order_release); // spdlog::info("TGID {} released GPU {} lock", tgid, gpu_id); } this->is_holding_lock = false; } uint64_t ShmWorker::make_request(uint32_t type, uint64_t parameter) { while (true) { uint64_t expected = ShmHelper::READY_STATE_NO_REQUEST; if (shm_helper->req_ready.load(std::memory_order_acquire) == ShmHelper::READY_STATE_NO_REQUEST) { // set ready to 1 if (shm_helper->req_ready.compare_exchange_weak( expected, ShmHelper::READY_STATE_PREPARING_REQUEST, std::memory_order_acq_rel, std::memory_order_acquire)) { break; } } usleep(1000); } // prepare request shm_helper->request.type = type; shm_helper->request.tgid = tgid; shm_helper->request.parameter = parameter; // set ready shm_helper->req_ready.store(ShmHelper::READY_STATE_REQUEST_READY, std::memory_order_release); // wait until processed while (shm_helper->req_ready.load(std::memory_order_acquire) != ShmHelper::READY_STATE_REQUEST_PROCESSED) { usleep(1000); } // get response uint64_t response = shm_helper->request.response; // set ready to 0 shm_helper->req_ready.store(ShmHelper::READY_STATE_NO_REQUEST, std::memory_order_release); return response; } int ShmWorker::register_worker_shm() { uint64_t slot = make_request(ShmHelper::REQUEST_TYPE_REGISTER_WORKER, tgid); spdlog::info("TGID {} registered as worker in slot {}", tgid, slot); if (slot == static_cast(-1) || slot >= MAX_WORKERS) { spdlog::error("TGID {} failed to register as worker", tgid); throw std::runtime_error("Failed to register as worker"); } return static_cast(slot); }