182 lines
6.4 KiB
C++
182 lines
6.4 KiB
C++
#include "shm_manager.h"
|
|
#include <algorithm>
|
|
|
|
|
|
ShmManager::ShmManager() {
|
|
std::string shm_name = get_shm_name();
|
|
int shm_fd = shm_open(shm_name.c_str(), O_CREAT | O_RDWR, 0666);
|
|
if (shm_fd == -1) {
|
|
spdlog::error("Failed to create shared memory segment");
|
|
throw std::runtime_error("Failed to create shared memory segment");
|
|
}
|
|
if (ftruncate(shm_fd, SHM_SIZE) == -1) {
|
|
spdlog::error("Failed to set size of shared memory segment");
|
|
throw std::runtime_error("Failed to set size of shared memory segment");
|
|
}
|
|
void *ptr =
|
|
mmap(nullptr, SHM_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0);
|
|
if (ptr == MAP_FAILED) {
|
|
spdlog::error("Failed to map shared memory segment");
|
|
throw std::runtime_error("Failed to map shared memory segment");
|
|
}
|
|
spdlog::info("Shared memory segment created, size: {} bytes", SHM_SIZE);
|
|
close(shm_fd);
|
|
memset(ptr, 0, SHM_SIZE);
|
|
// initialize shm_helper
|
|
shm_helper = static_cast<ShmHelper *>(ptr);
|
|
shm_helper->init();
|
|
|
|
local_worker_tgids.resize(MAX_WORKERS, 0);
|
|
stop_loop_flag.store(false, std::memory_order_release);
|
|
}
|
|
|
|
ShmManager::~ShmManager() {
|
|
munmap(shm_helper, SHM_SIZE);
|
|
std::string shm_name = get_shm_name();
|
|
shm_unlink(shm_name.c_str());
|
|
}
|
|
|
|
void ShmManager::set_gpu_info(int gpu_id, uint64_t vmem_size,
|
|
uint64_t shared_handle) {
|
|
shm_helper->set_gpu_info(gpu_id, vmem_size, shared_handle);
|
|
this->valid_gpu_ids.push_back(gpu_id);
|
|
}
|
|
|
|
void ShmManager::run_busy_loop() {
|
|
if (!cb_on_worker_change) {
|
|
spdlog::error("cb_on_worker_change is not set");
|
|
throw std::runtime_error("cb_on_worker_change is not set");
|
|
}
|
|
|
|
spdlog::info("ShmManager busy loop started");
|
|
|
|
int heart_beat_check_everyN = 20;
|
|
int loop_cnt = 0;
|
|
|
|
while (!stop_loop_flag.load(std::memory_order_acquire)) {
|
|
process_requests();
|
|
|
|
if (loop_cnt % heart_beat_check_everyN == 0) {
|
|
check_heart_beats();
|
|
}
|
|
|
|
loop_cnt = (loop_cnt + 1) % heart_beat_check_everyN;
|
|
usleep(heartbeat_us);
|
|
}
|
|
|
|
spdlog::info("ShmManager busy loop stopped");
|
|
}
|
|
|
|
|
|
void ShmManager::process_requests() {
|
|
uint64_t req_status = shm_helper->req_ready.load(std::memory_order_acquire);
|
|
if (req_status == ShmHelper::READY_STATE_REQUEST_READY) {
|
|
uint32_t type = shm_helper->request.type;
|
|
int32_t tgid = shm_helper->request.tgid;
|
|
uint64_t parameter = shm_helper->request.parameter;
|
|
spdlog::info("Get request: type {}, TGID {}, parameter {}", type,
|
|
tgid, parameter);
|
|
switch (type) {
|
|
case ShmHelper::REQUEST_TYPE_REGISTER_WORKER: {
|
|
int32_t tgid = (int32_t)parameter;
|
|
// get heart beat slot
|
|
int slot = -1;
|
|
for (int i = 0; i < MAX_WORKERS; ++i) {
|
|
int32_t slot_tgid =
|
|
shm_helper->heart_beats[i].tgid.load(std::memory_order_acquire);
|
|
if (slot_tgid == 0 && slot == -1) {
|
|
slot = i;
|
|
if (local_worker_tgids[i] != 0) {
|
|
spdlog::error(
|
|
"Maybe bug: in register_worker: Worker slot {} TGID mismatch (local: {}, shm: "
|
|
"{})",
|
|
i, local_worker_tgids[i], 0);
|
|
}
|
|
}
|
|
// check repeat, maybe a worker reborn and register again
|
|
if (slot_tgid == tgid) {
|
|
// already registered
|
|
spdlog::warn("Worker TGID {} already registered in slot {}", tgid,
|
|
i);
|
|
slot = i;
|
|
break;
|
|
}
|
|
}
|
|
if (slot == -1) {
|
|
spdlog::error("Reach max worker limit, no available heart beat slot");
|
|
} else {
|
|
uint64_t cur_ts = heartbeat_ts_us();
|
|
local_worker_tgids[slot] = tgid;
|
|
shm_helper->heart_beats[slot].tgid.store(tgid,
|
|
std::memory_order_release);
|
|
shm_helper->heart_beats[slot].timestamp.store(cur_ts,
|
|
std::memory_order_release);
|
|
// register worker
|
|
alive_worker_tgids.push_back(tgid);
|
|
// set pid
|
|
cb_on_worker_change(alive_worker_tgids);
|
|
}
|
|
shm_helper->request.response = static_cast<uint64_t>(slot);
|
|
shm_helper->req_ready.store(ShmHelper::READY_STATE_REQUEST_PROCESSED,
|
|
std::memory_order_release);
|
|
break;
|
|
}
|
|
default: {
|
|
spdlog::error("Unknown request type {}", type);
|
|
shm_helper->request.response = 0;
|
|
shm_helper->req_ready.store(ShmHelper::READY_STATE_REQUEST_PROCESSED,
|
|
std::memory_order_release);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
void ShmManager::check_heart_beats() {
|
|
bool updated = false;
|
|
for (int i = 0; i < MAX_WORKERS; ++i) {
|
|
uint64_t ts =
|
|
shm_helper->heart_beats[i].timestamp.load(std::memory_order_acquire);
|
|
int32_t tgid =
|
|
shm_helper->heart_beats[i].tgid.load(std::memory_order_acquire);
|
|
if (tgid != local_worker_tgids[i]) {
|
|
spdlog::error(
|
|
"Maybe bug: Worker slot {} TGID mismatch (local: {}, shm: {})", i,
|
|
local_worker_tgids[i], tgid);
|
|
local_worker_tgids[i] = tgid;
|
|
}
|
|
if (tgid != 0) {
|
|
uint64_t now = heartbeat_ts_us();
|
|
if (now - ts > heartbeat_timeout_us) {
|
|
// worker died
|
|
updated = true;
|
|
spdlog::info("Detected dead worker TGID {}", tgid);
|
|
shm_helper->heart_beats[i].tgid = 0;
|
|
shm_helper->heart_beats[i].timestamp.store(0,
|
|
std::memory_order_release);
|
|
// check dead lock
|
|
for (int gpu_id : valid_gpu_ids) {
|
|
uint64_t gpu_flag =
|
|
shm_helper->gpu_flag[gpu_id].load(std::memory_order_acquire);
|
|
if (unpack_lock_field(gpu_flag) == 1 &&
|
|
unpack_tgid_field(gpu_flag) == tgid) {
|
|
// release lock held by dead worker
|
|
spdlog::warn("Releasing GPU {} lock held by dead worker TGID {}",
|
|
gpu_id, tgid);
|
|
shm_helper->gpu_flag[gpu_id].store(pack_unlocked_tgid(tgid),
|
|
std::memory_order_release);
|
|
}
|
|
}
|
|
local_worker_tgids[i] = 0;
|
|
alive_worker_tgids.erase(std::remove(alive_worker_tgids.begin(),
|
|
alive_worker_tgids.end(), tgid),
|
|
alive_worker_tgids.end());
|
|
spdlog::info("Current alive workers: {}", alive_worker_tgids.size());
|
|
}
|
|
}
|
|
}
|
|
if (updated) {
|
|
cb_on_worker_change(alive_worker_tgids);
|
|
}
|
|
}
|