From a77fe932e401c238197b4e33f4e24a56affe9362 Mon Sep 17 00:00:00 2001 From: Frank Chen Date: Sun, 1 Mar 2026 20:30:43 +0800 Subject: [PATCH] [Platform] Fix CPU binding logic (#6889) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### What this PR does / why we need it? - Rework CpuAlloc.handle_no_affinity() to build available NUMA nodes after allowed_cpus filtering, assign NPUs to NUMA nodes via round‑robin, and split CPUs per NPU with disjoint slices for better balance. - Improve bind_memory() robustness by deriving the target NUMA from each NPU’s CPU pool, validating NUMA existence, and skipping binding when data is missing. - bind_memory() now only bind the single NUMA node that corresponds to NPU id, instead of 2 NUMA nodes. - Fix the issue that all NPUs bind to 0th NUMA node when DP16 due to global NPU id is not visible across DP domain. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added/updated unit tests: test_cpu_binding.py 1. test_binding_mode_table covers A2 vs A3 binding mode mapping. 2. test_build_cpu_pools_fallback_to_numa_balanced covers fallback when affinity info is missing. 3. TestBindingSwitch.test_is_arm_cpu covers ARM/x86/unknown arch detection. 4. test_bind_cpus_skip_non_arm covers non‑ARM skip path in bind_cpus. test_worker_v1.py 1. Updated mocks for enable_cpu_binding default True to align with new config default. - vLLM version: v0.16.0 - vLLM main: https://github.com/vllm-project/vllm/commit/15d76f74e2fdb12a95ea00f0ca283acf6219a2b7 Signed-off-by: chenchuw886 Co-authored-by: chenchuw886 --- vllm_ascend/cpu_binding.py | 120 +++++++++++++++++++++++++++---------- 1 file changed, 88 insertions(+), 32 deletions(-) diff --git a/vllm_ascend/cpu_binding.py b/vllm_ascend/cpu_binding.py index c09e740a..dfcf6240 100644 --- a/vllm_ascend/cpu_binding.py +++ b/vllm_ascend/cpu_binding.py @@ -209,38 +209,75 @@ class CpuAlloc: raise RuntimeError("lscpu command output error, no NUMA node available. Please check!") def handle_no_affinity(self) -> None: - num_running_npu = len(self.device_info.running_npu_list) - num_numa_node = len(self.numa_to_cpu_map) - if num_numa_node == 0 or num_running_npu == 0: + """ + 1) Build available NUMA nodes after allowed_cpus filtering + 2) Assign NPUs to NUMA nodes by round-robin (npu_id % num_nodes) + 3) Within each NUMA node, split its CPU list into per-NPU disjoint slices + """ + running = list(self.device_info.running_npu_list) + if not running or not self.numa_to_cpu_map: return - if num_running_npu % num_numa_node != 0: - npu_num_per_node = num_running_npu // num_numa_node + 1 - else: - npu_num_per_node = num_running_npu // num_numa_node - index = 0 + + # 1) Only keep NUMA nodes that still have CPUs after allowed_cpus filtering. + available_nodes: list[tuple[int, list[int]]] = [] for node in sorted(self.numa_to_cpu_map): - # Available CPUs on this NUMA (constrained by allowed_cpus) cpus = [c for c in self.numa_to_cpu_map[node] if c in self.device_info.allowed_cpus] - if not cpus: + if cpus: + available_nodes.append((node, cpus)) + if not available_nodes: + return + + num_nodes = len(available_nodes) + + # Infer "my_npu" from local rank + visible running_npu_list, assuming local rank is index into running_npu_list. + if 0 <= self.rank_id < len(running): + my_npu = running[self.rank_id] + else: + # Fallback: modulo in case rank range is larger than visible list length. + my_npu = running[self.rank_id % len(running)] + + print( + f"[no_affinity_fine] rank:{self.rank_id} -> my_npu:{my_npu}; " + f"running_npu_list:{running}; num_available_nodes:{num_nodes}" + ) + + # 2) Round-robin assign NPUs to nodes based on NPU id (same as new logic). + # Build: node_index -> list[npu] + node_to_npus: dict[int, list[int]] = {i: [] for i in range(num_nodes)} + for npu in running: + node_index = npu % num_nodes + node_to_npus[node_index].append(npu) + + # 3) Within each node, split cpus among the NPUs assigned to this node. + for node_index, npus in node_to_npus.items(): + if not npus: continue - # The actual number of NPUs to be allocated on this NUMA. - npu_num_this_node = min(npu_num_per_node, num_running_npu - index) - if npu_num_this_node <= 0: - break - # NUMA-balanced distribute the CPUs of this NUMA node among npu_num_this_node NPUs. + + node_id, cpus = available_nodes[node_index] total_cpu_num = len(cpus) - base_cpu_num = total_cpu_num // npu_num_this_node - extra_cpu_num = total_cpu_num % npu_num_this_node - start_index = 0 - for i in range(npu_num_this_node): - take_cpu_num = base_cpu_num + (1 if i < extra_cpu_num else 0) - end_index = start_index + take_cpu_num - select_cpus_list = cpus[start_index:end_index] - if index < num_running_npu: - npu = self.device_info.running_npu_list[index] - self.npu_cpu_pool[npu] = select_cpus_list - index += 1 - start_index = end_index + n = len(npus) + + # Edge case: should not happen because we filtered cpus, but keep safe. + if total_cpu_num == 0: + continue + + # If CPUs are fewer than NPUs, we can only guarantee small (possibly duplicated) slices. + if total_cpu_num < n: + for i, npu in enumerate(npus): + cpu = cpus[i % total_cpu_num] + self.npu_cpu_pool[npu] = [cpu] + continue + + # Even split (disjoint slices), first 'extra' NPUs take 1 more CPU. + base = total_cpu_num // n + extra = total_cpu_num % n + + start = 0 + for i, npu in enumerate(npus): + take = base + (1 if i < extra else 0) + end = start + take + self.npu_cpu_pool[npu] = cpus[start:end] + start = end DEVICE_BINDING_MODE = { AscendDeviceType.A3: "numa_balanced", @@ -302,15 +339,34 @@ class CpuAlloc: logger.info(f"NPU{current_npu}: main=[{main}] acl=[{acl}] release=[{rel}]") def bind_memory(self, pid: str, npu: int) -> None: + def _get_npu_numa_node(npu_id: int) -> int | None: + cpu_pool = self.npu_cpu_pool.get(npu_id, []) + if not cpu_pool: + return None + anchor_cpu = cpu_pool[0] + return self.cpu_node.get(anchor_cpu) + if not shutil.which("migratepages"): logger.info("The 'migratepages' command is not available, skipping memory binding.") return + target_numa = _get_npu_numa_node(npu) + if target_numa is None: + logger.warning(f"[migrate] rank:{self.rank_id} -> NPU{npu} has no CPU pool, skip memory binding.") + return all_numa_nodes = sorted(self.numa_to_cpu_map.keys()) - target_cpu = self.assign_acl[npu][0] - target_numa = self.cpu_node[target_cpu] - bind_numa_list = [target_numa, target_numa + 1 if target_numa % 2 == 0 else target_numa - 1] - logger.info(f"[migrate] rank:{self.rank_id} -> NUMA {bind_numa_list}") - execute_command(["migratepages", pid, ",".join(map(str, all_numa_nodes)), ",".join(map(str, bind_numa_list))]) + if target_numa not in all_numa_nodes: + logger.warning(f"[migrate] NPU:{npu} -> NUMA {target_numa} not found, skip memory binding.") + return + # Bind memory to the NPU's NUMA node only to minimize cross-NUMA traffic. + logger.info(f"[migrate] NPU:{npu} -> NUMA [{target_numa}]") + execute_command( + [ + "migratepages", + pid, + ",".join(map(str, all_numa_nodes)), + str(target_numa), + ] + ) def bind_threads(self) -> None: thread_message, _ = execute_command(["ps", "-Te"])