#include "shm_manager.h" #include // === ShmManager === ShmManager::ShmManager() { std::string shm_name = get_shm_name(); int shm_fd = shm_open(shm_name.c_str(), O_CREAT | O_RDWR, 0666); if (shm_fd == -1) { spdlog::error("Failed to create shared memory segment"); throw std::runtime_error("Failed to create shared memory segment"); } if (ftruncate(shm_fd, SHM_SIZE) == -1) { spdlog::error("Failed to set size of shared memory segment"); throw std::runtime_error("Failed to set size of shared memory segment"); } void *ptr = mmap(nullptr, SHM_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0); if (ptr == MAP_FAILED) { spdlog::error("Failed to map shared memory segment"); throw std::runtime_error("Failed to map shared memory segment"); } spdlog::info("Shared memory segment created, size: {} bytes", SHM_SIZE); close(shm_fd); memset(ptr, 0, SHM_SIZE); // initialize shm_helper shm_helper = static_cast(ptr); shm_helper->init(); local_worker_tgids.resize(MAX_WORKERS, 0); stop_loop_flag.store(false, std::memory_order_release); } ShmManager::~ShmManager() { munmap(shm_helper, SHM_SIZE); std::string shm_name = get_shm_name(); shm_unlink(shm_name.c_str()); } void ShmManager::set_gpu_info(uint64_t vmem_size, uint64_t shared_handle) { shm_helper->set_gpu_info(vmem_size, shared_handle); } void ShmManager::run_busy_loop() { if (!cb_on_worker_change) { spdlog::error("cb_on_worker_change is not set"); throw std::runtime_error("cb_on_worker_change is not set"); } spdlog::info("ShmManager busy loop started"); int heart_beat_check_everyN = 20; int loop_cnt = 0; while (!stop_loop_flag.load(std::memory_order_acquire)) { process_requests(); if (loop_cnt % heart_beat_check_everyN == 0) { check_heart_beats(); } loop_cnt = (loop_cnt + 1) % heart_beat_check_everyN; usleep(heartbeat_us); } spdlog::info("ShmManager busy loop stopped"); } void ShmManager::process_requests() { uint64_t req_status = shm_helper->req_ready.load(std::memory_order_acquire); if (req_status == ShmHelper::READY_STATE_REQUEST_READY) { uint32_t type = shm_helper->request.type; int32_t tgid = shm_helper->request.tgid; uint64_t parameter = shm_helper->request.parameter; spdlog::info("Get request: type {}, TGID {}, parameter {}", type, tgid, parameter); switch (type) { case ShmHelper::REQUEST_TYPE_REGISTER_WORKER: { int32_t tgid = (int32_t)parameter; // get heart beat slot int slot = -1; for (int i = 0; i < MAX_WORKERS; ++i) { int32_t slot_tgid = shm_helper->heart_beats[i].tgid.load(std::memory_order_acquire); if (slot_tgid == 0 && slot == -1) { slot = i; if (local_worker_tgids[i] != 0) { spdlog::error( "Maybe bug: in register_worker: Worker slot {} TGID mismatch (local: {}, shm: " "{})", i, local_worker_tgids[i], 0); } } // check repeat, maybe a worker reborn and register again if (slot_tgid == tgid) { // already registered spdlog::warn("Worker TGID {} already registered in slot {}", tgid, i); slot = i; break; } } if (slot == -1) { spdlog::error("Reach max worker limit, no available heart beat slot"); } else { uint64_t cur_ts = heartbeat_ts_us(); local_worker_tgids[slot] = tgid; shm_helper->heart_beats[slot].tgid.store(tgid, std::memory_order_release); shm_helper->heart_beats[slot].timestamp.store(cur_ts, std::memory_order_release); // register worker alive_worker_tgids.push_back(tgid); // set pid cb_on_worker_change(alive_worker_tgids); } shm_helper->request.response = static_cast(slot); shm_helper->req_ready.store(ShmHelper::READY_STATE_REQUEST_PROCESSED, std::memory_order_release); break; } default: { spdlog::error("Unknown request type {}", type); shm_helper->request.response = 0; shm_helper->req_ready.store(ShmHelper::READY_STATE_REQUEST_PROCESSED, std::memory_order_release); break; } } } } void ShmManager::check_heart_beats() { bool updated = false; for (int i = 0; i < MAX_WORKERS; ++i) { uint64_t ts = shm_helper->heart_beats[i].timestamp.load(std::memory_order_acquire); int32_t tgid = shm_helper->heart_beats[i].tgid.load(std::memory_order_acquire); if (tgid != local_worker_tgids[i]) { spdlog::error( "Maybe bug: Worker slot {} TGID mismatch (local: {}, shm: {})", i, local_worker_tgids[i], tgid); local_worker_tgids[i] = tgid; } if (tgid != 0) { uint64_t now = heartbeat_ts_us(); if (now - ts > heartbeat_timeout_us) { // worker died updated = true; spdlog::info("Detected dead worker TGID {}", tgid); shm_helper->heart_beats[i].tgid = 0; shm_helper->heart_beats[i].timestamp.store(0, std::memory_order_release); // check dead lock uint64_t gpu_flag = shm_helper->gpu_flag.load(std::memory_order_acquire); if (unpack_lock_field(gpu_flag) == 1 && unpack_tgid_field(gpu_flag) == tgid) { // release lock held by dead worker spdlog::warn("Releasing GPU lock held by dead worker TGID {}", tgid); shm_helper->gpu_flag.store(pack_unlocked_tgid(tgid), std::memory_order_release); } local_worker_tgids[i] = 0; alive_worker_tgids.erase(std::remove(alive_worker_tgids.begin(), alive_worker_tgids.end(), tgid), alive_worker_tgids.end()); spdlog::info("Current alive workers: {}", alive_worker_tgids.size()); } } } if (updated) { cb_on_worker_change(alive_worker_tgids); } }