CPU: map changes from developing branch in sgl-kernel (#6833)

Co-authored-by: mingfeima <mingfei.ma@intel.com>
This commit is contained in:
YanbingJiang
2025-06-10 16:08:15 +08:00
committed by GitHub
parent 81372f3bef
commit fcde67b016
20 changed files with 1321 additions and 321 deletions

View File

@@ -54,7 +54,8 @@ void shared_open(SharedData* data, const char* name, size_t nbytes) {
void shared_create(SharedData* data, const char* name, void* bytes, size_t nbytes) {
int d = shm_open(name, O_CREAT | O_RDWR, S_IRUSR | S_IWUSR);
if (d != -1) {
if (nbytes = write(d, bytes, nbytes)) {
nbytes = write(d, bytes, nbytes);
if (nbytes > 0) {
shared_open(data, name, nbytes);
}
} else {
@@ -391,7 +392,7 @@ void reduce_fp32_buffers(int start_elements, int num_elements, char* to_buffer,
static bool is_initialized = false;
static int world_rank;
void shm_initialize(int size, int rank, char* addr_string, char* port_string) {
void shm_initialize(int size, int rank, const char* addr_string, const char* port_string) {
if (is_initialized) {
return;
}
@@ -409,7 +410,7 @@ void shm_initialize(int size, int rank, char* addr_string, char* port_string) {
struct allreduce_workspace* workspace_buf;
struct allreduce_workspace* workspace_buf_other;
workspace_buf = (struct allreduce_workspace*)malloc(sizeof(struct allreduce_workspace));
snprintf(shm_name, NAME_BUF_SIZE, "%s_%d", shm_name_prefix, rank);
snprintf(shm_name, NAME_BUF_SIZE, "%.900s_%d", shm_name_prefix, rank);
shared_create(&allreduce_buffer, shm_name, workspace_buf, sizeof(struct allreduce_workspace));
workspace_buf = (struct allreduce_workspace*)allreduce_buffer.bytes;
workspace_buf->states[0] = coll_alt2_allreduce_naive__copy_in_done;
@@ -425,7 +426,7 @@ void shm_initialize(int size, int rank, char* addr_string, char* port_string) {
// map shm of all ranks
for (int i = 0; i < size; i++) {
if (i != rank) {
snprintf(shm_name, NAME_BUF_SIZE, "%s_%d", shm_name_prefix, i);
snprintf(shm_name, NAME_BUF_SIZE, "%.900s_%d", shm_name_prefix, i);
// printf("open %s, %d\n", shm_name, rank);
do {
shared_open(&allreduce_buffer, shm_name, sizeof(struct allreduce_workspace));
@@ -447,13 +448,13 @@ static void parallel_memcpy(void* to, void* from, size_t n_bytes) {
auto aligned_bytes = n_bytes - (n_bytes % VECTOR_LENGTH_IN_BYTES);
// process aligned part
#pragma omp parallel for
for (int i = 0; i < aligned_bytes; i += VECTOR_LENGTH_IN_BYTES) {
for (size_t i = 0; i < aligned_bytes; i += VECTOR_LENGTH_IN_BYTES) {
auto val = _mm256_loadu_si256((__m256i*)((char*)from + i));
_mm256_storeu_si256((__m256i*)((char*)to + i), val);
}
// process remaining part
for (int i = aligned_bytes; i < n_bytes; i++) {
for (size_t i = aligned_bytes; i < n_bytes; i++) {
*((char*)to + i) = *((char*)from + i);
}
}
@@ -481,7 +482,9 @@ void symmetric_naive_all_reduce(char* data_ptr, c10::ScalarType scalar_type, siz
static int current_buffer = 0;
static int state_idx = 0;
enum coll_state copy_current, copy_next;
// init states to case 0 to get rid of "maybe-uninitialized" warning.
enum coll_state copy_current = coll_allreduce_naive__copy_in_done;
enum coll_state copy_next = coll_alt1_allreduce_naive__copy_in_done;
switch (state_idx) {
case 0:
@@ -526,7 +529,10 @@ void distributed_naive_reduce(char* data_ptr, c10::ScalarType scalar_type, size_
static int current_buffer = 0;
static int state_idx = 0;
enum coll_state copy_current, copy_next, reduce_current;
// init states to case 0 to get rid of "maybe-uninitialized" warning.
enum coll_state copy_current = coll_allreduce_naive__copy_in_done;
enum coll_state reduce_current = coll_allreduce_naive__reduce_done;
enum coll_state copy_next = coll_alt1_allreduce_naive__copy_in_done;
// similar to symmetric_naive_allreduce, but here we only need two sets of
// states, because distributed naive reduce has two barriers in the algorithm
@@ -601,7 +607,9 @@ void naive_all_gather(char* result_ptr, char* data_ptr, size_t res_stride, size_
static int current_buffer = 0;
static int state_idx = 0;
enum coll_state copy_current, copy_next;
// init states to case 0 to get rid of "maybe-uninitialized" warning.
enum coll_state copy_current = coll_allgather_naive__copy_in_done;
enum coll_state copy_next = coll_alt1_allgather_naive__copy_in_done;
switch (state_idx) {
case 0:
@@ -621,7 +629,6 @@ void naive_all_gather(char* result_ptr, char* data_ptr, size_t res_stride, size_
}
state_idx = (state_idx + 1) % 3;
int data_size = chunk_size / chunk_el;
parallel_memcpy(distributed_buffer[current_buffer][world_rank], data_ptr, chunk_size);
std::atomic_thread_fence(std::memory_order_release);
workspace[world_rank]->states[state_group] = copy_current;
@@ -644,7 +651,7 @@ torch::Tensor& all_gather(torch::Tensor& result, torch::Tensor& data, int dim, s
auto data_ptr = (char*)(data.data_ptr());
auto result_ptr = (char*)(result.data_ptr());
for (int i = 0; i < dim_count; i++) {
for (int offset = 0; offset < dim_size; offset += MAX_BUF_SIZE) {
for (size_t offset = 0; offset < dim_size; offset += MAX_BUF_SIZE) {
size_t chunk_size = dim_size - offset > MAX_BUF_SIZE ? MAX_BUF_SIZE : dim_size - offset;
size_t chunk_el = chunk_size / dtype_size;
naive_all_gather(