diff --git a/vllm_ascend/cpu_binding.py b/vllm_ascend/cpu_binding.py index c09e740a..dfcf6240 100644 --- a/vllm_ascend/cpu_binding.py +++ b/vllm_ascend/cpu_binding.py @@ -209,38 +209,75 @@ class CpuAlloc: raise RuntimeError("lscpu command output error, no NUMA node available. Please check!") def handle_no_affinity(self) -> None: - num_running_npu = len(self.device_info.running_npu_list) - num_numa_node = len(self.numa_to_cpu_map) - if num_numa_node == 0 or num_running_npu == 0: + """ + 1) Build available NUMA nodes after allowed_cpus filtering + 2) Assign NPUs to NUMA nodes by round-robin (npu_id % num_nodes) + 3) Within each NUMA node, split its CPU list into per-NPU disjoint slices + """ + running = list(self.device_info.running_npu_list) + if not running or not self.numa_to_cpu_map: return - if num_running_npu % num_numa_node != 0: - npu_num_per_node = num_running_npu // num_numa_node + 1 - else: - npu_num_per_node = num_running_npu // num_numa_node - index = 0 + + # 1) Only keep NUMA nodes that still have CPUs after allowed_cpus filtering. + available_nodes: list[tuple[int, list[int]]] = [] for node in sorted(self.numa_to_cpu_map): - # Available CPUs on this NUMA (constrained by allowed_cpus) cpus = [c for c in self.numa_to_cpu_map[node] if c in self.device_info.allowed_cpus] - if not cpus: + if cpus: + available_nodes.append((node, cpus)) + if not available_nodes: + return + + num_nodes = len(available_nodes) + + # Infer "my_npu" from local rank + visible running_npu_list, assuming local rank is index into running_npu_list. + if 0 <= self.rank_id < len(running): + my_npu = running[self.rank_id] + else: + # Fallback: modulo in case rank range is larger than visible list length. + my_npu = running[self.rank_id % len(running)] + + print( + f"[no_affinity_fine] rank:{self.rank_id} -> my_npu:{my_npu}; " + f"running_npu_list:{running}; num_available_nodes:{num_nodes}" + ) + + # 2) Round-robin assign NPUs to nodes based on NPU id (same as new logic). + # Build: node_index -> list[npu] + node_to_npus: dict[int, list[int]] = {i: [] for i in range(num_nodes)} + for npu in running: + node_index = npu % num_nodes + node_to_npus[node_index].append(npu) + + # 3) Within each node, split cpus among the NPUs assigned to this node. + for node_index, npus in node_to_npus.items(): + if not npus: continue - # The actual number of NPUs to be allocated on this NUMA. - npu_num_this_node = min(npu_num_per_node, num_running_npu - index) - if npu_num_this_node <= 0: - break - # NUMA-balanced distribute the CPUs of this NUMA node among npu_num_this_node NPUs. + + node_id, cpus = available_nodes[node_index] total_cpu_num = len(cpus) - base_cpu_num = total_cpu_num // npu_num_this_node - extra_cpu_num = total_cpu_num % npu_num_this_node - start_index = 0 - for i in range(npu_num_this_node): - take_cpu_num = base_cpu_num + (1 if i < extra_cpu_num else 0) - end_index = start_index + take_cpu_num - select_cpus_list = cpus[start_index:end_index] - if index < num_running_npu: - npu = self.device_info.running_npu_list[index] - self.npu_cpu_pool[npu] = select_cpus_list - index += 1 - start_index = end_index + n = len(npus) + + # Edge case: should not happen because we filtered cpus, but keep safe. + if total_cpu_num == 0: + continue + + # If CPUs are fewer than NPUs, we can only guarantee small (possibly duplicated) slices. + if total_cpu_num < n: + for i, npu in enumerate(npus): + cpu = cpus[i % total_cpu_num] + self.npu_cpu_pool[npu] = [cpu] + continue + + # Even split (disjoint slices), first 'extra' NPUs take 1 more CPU. + base = total_cpu_num // n + extra = total_cpu_num % n + + start = 0 + for i, npu in enumerate(npus): + take = base + (1 if i < extra else 0) + end = start + take + self.npu_cpu_pool[npu] = cpus[start:end] + start = end DEVICE_BINDING_MODE = { AscendDeviceType.A3: "numa_balanced", @@ -302,15 +339,34 @@ class CpuAlloc: logger.info(f"NPU{current_npu}: main=[{main}] acl=[{acl}] release=[{rel}]") def bind_memory(self, pid: str, npu: int) -> None: + def _get_npu_numa_node(npu_id: int) -> int | None: + cpu_pool = self.npu_cpu_pool.get(npu_id, []) + if not cpu_pool: + return None + anchor_cpu = cpu_pool[0] + return self.cpu_node.get(anchor_cpu) + if not shutil.which("migratepages"): logger.info("The 'migratepages' command is not available, skipping memory binding.") return + target_numa = _get_npu_numa_node(npu) + if target_numa is None: + logger.warning(f"[migrate] rank:{self.rank_id} -> NPU{npu} has no CPU pool, skip memory binding.") + return all_numa_nodes = sorted(self.numa_to_cpu_map.keys()) - target_cpu = self.assign_acl[npu][0] - target_numa = self.cpu_node[target_cpu] - bind_numa_list = [target_numa, target_numa + 1 if target_numa % 2 == 0 else target_numa - 1] - logger.info(f"[migrate] rank:{self.rank_id} -> NUMA {bind_numa_list}") - execute_command(["migratepages", pid, ",".join(map(str, all_numa_nodes)), ",".join(map(str, bind_numa_list))]) + if target_numa not in all_numa_nodes: + logger.warning(f"[migrate] NPU:{npu} -> NUMA {target_numa} not found, skip memory binding.") + return + # Bind memory to the NPU's NUMA node only to minimize cross-NUMA traffic. + logger.info(f"[migrate] NPU:{npu} -> NUMA [{target_numa}]") + execute_command( + [ + "migratepages", + pid, + ",".join(map(str, all_numa_nodes)), + str(target_numa), + ] + ) def bind_threads(self) -> None: thread_message, _ = execute_command(["ps", "-Te"])