#include "shm_manager.h" #include "xpu_helper.h" #include ShmManager::ShmManager() { std::string shm_name = get_shm_name(); int shm_fd = shm_open(shm_name.c_str(), O_CREAT | O_RDWR, 0666); if (shm_fd == -1) { spdlog::error("Failed to create shared memory segment"); throw std::runtime_error("Failed to create shared memory segment"); } if (ftruncate(shm_fd, SHM_SIZE) == -1) { spdlog::error("Failed to set size of shared memory segment"); throw std::runtime_error("Failed to set size of shared memory segment"); } void *ptr = mmap(nullptr, SHM_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0); if (ptr == MAP_FAILED) { spdlog::error("Failed to map shared memory segment"); throw std::runtime_error("Failed to map shared memory segment"); } spdlog::info("Shared memory segment created, size: {} bytes", SHM_SIZE); close(shm_fd); memset(ptr, 0, SHM_SIZE); // initialize shm_helper shm_helper = static_cast(ptr); shm_helper->init(); _cur_worker_id = 0; local_worker_ids.resize(MAX_WORKERS, 0); stop_loop_flag.store(false, std::memory_order_release); } ShmManager::~ShmManager() { munmap(shm_helper, SHM_SIZE); std::string shm_name = get_shm_name(); shm_unlink(shm_name.c_str()); } void ShmManager::set_xpu_info(int device_id, uint32_t xpu_pci_addr, size_t vmem_size, const XPUIpcMemHandle &xpu_ipc_mem_handle) { shm_helper->gpu_pci_addr[device_id] = xpu_pci_addr; shm_helper->vmem_size[device_id] = vmem_size; memcpy(&shm_helper->xpu_mem_handle[device_id], &xpu_ipc_mem_handle, sizeof(XPUIpcMemHandle)); valid_gpu_ids.push_back(device_id); } void ShmManager::run_busy_loop() { spdlog::info("ShmManager busy loop started"); int heart_beat_check_everyN = 20; int loop_cnt = 0; while (!stop_loop_flag.load(std::memory_order_acquire)) { process_requests(); if (loop_cnt % heart_beat_check_everyN == 0) { check_heart_beats(); } loop_cnt = (loop_cnt + 1) % heart_beat_check_everyN; usleep(heartbeat_us); } spdlog::info("ShmManager busy loop stopped"); } int32_t ShmManager::get_next_worker_id() { ++_cur_worker_id; if (_cur_worker_id <= 0) { _cur_worker_id = 1; } return _cur_worker_id; } void ShmManager::process_requests() { uint64_t req_status = shm_helper->req_ready.load(std::memory_order_acquire); if (req_status == ShmHelper::READY_STATE_REQUEST_READY) { uint32_t type = shm_helper->request.type; int32_t worker_id = shm_helper->request.worker_id; uint64_t parameter = shm_helper->request.parameter; spdlog::info("Get request: type {}, worker {}, parameter {}", type, worker_id, parameter); switch (type) { case ShmHelper::REQUEST_TYPE_REGISTER_WORKER: { int32_t worker_id = get_next_worker_id(); // get heart beat slot int slot = -1; for (int i = 0; i < MAX_WORKERS; ++i) { int32_t slot_worker_id = shm_helper->heart_beats[i].worker_id.load( std::memory_order_acquire); if (slot_worker_id == 0 && slot == -1) { slot = i; if (local_worker_ids[i] != 0) { spdlog::error("Maybe bug: in register_worker: Worker slot {} " "worker_id mismatch (local: {}, shm: " "{})", i, local_worker_ids[i], 0); } } } if (slot == -1) { spdlog::error("Reach max worker limit, no available heart beat slot"); worker_id = -1; } else { uint64_t cur_ts = heartbeat_ts_us(); local_worker_ids[slot] = worker_id; shm_helper->heart_beats[slot].worker_id.store( worker_id, std::memory_order_release); shm_helper->heart_beats[slot].timestamp.store( cur_ts, std::memory_order_release); // register worker alive_worker_ids.push_back(worker_id); spdlog::info("Registered new worker {}", worker_id); } // response = slot | worker_id uint64_t resp_value = (static_cast(slot) << 32) | (static_cast(worker_id) & 0xFFFFFFFF); shm_helper->request.response = resp_value; shm_helper->req_ready.store(ShmHelper::READY_STATE_REQUEST_PROCESSED, std::memory_order_release); break; } default: { spdlog::error("Unknown request type {}", type); shm_helper->request.response = 0; shm_helper->req_ready.store(ShmHelper::READY_STATE_REQUEST_PROCESSED, std::memory_order_release); break; } } } } void ShmManager::check_heart_beats() { bool updated = false; for (int i = 0; i < MAX_WORKERS; ++i) { uint64_t ts = shm_helper->heart_beats[i].timestamp.load(std::memory_order_acquire); int32_t worker_id = shm_helper->heart_beats[i].worker_id.load(std::memory_order_acquire); if (worker_id != local_worker_ids[i]) { spdlog::error( "Maybe bug: Worker slot {} worker_id mismatch (local: {}, shm: {})", i, local_worker_ids[i], worker_id); local_worker_ids[i] = worker_id; } if (worker_id != 0) { uint64_t now = heartbeat_ts_us(); if (now - ts > heartbeat_timeout_us) { // worker died updated = true; spdlog::info("Detected dead worker {}", worker_id); shm_helper->heart_beats[i].worker_id = 0; shm_helper->heart_beats[i].timestamp.store(0, std::memory_order_release); // check dead lock for (int gpu_id : valid_gpu_ids) { uint64_t gpu_flag = shm_helper->gpu_flag[gpu_id].load(std::memory_order_acquire); if (unpack_lock_field(gpu_flag) == 1 && unpack_worker_id_field(gpu_flag) == worker_id) { // release lock held by dead worker spdlog::warn("Releasing GPU {} lock held by dead worker {}", gpu_id, worker_id); shm_helper->gpu_flag[gpu_id].store( pack_unlocked_worker_id(worker_id), std::memory_order_release); } } // check request slot if (shm_helper->req_ready.load(std::memory_order_acquire) != ShmHelper::READY_STATE_NO_REQUEST && shm_helper->request.worker_id == worker_id) { spdlog::warn("Clearing pending request from dead worker {}", worker_id); shm_helper->req_ready.store(ShmHelper::READY_STATE_NO_REQUEST, std::memory_order_release); } local_worker_ids[i] = 0; alive_worker_ids.erase(std::remove(alive_worker_ids.begin(), alive_worker_ids.end(), worker_id), alive_worker_ids.end()); spdlog::info("Current alive workers: {}", alive_worker_ids.size()); } } } }