[Platform] Fix CPU binding logic (#6889)

### What this PR does / why we need it?

- Rework CpuAlloc.handle_no_affinity() to build available NUMA nodes
after allowed_cpus filtering, assign NPUs to NUMA nodes via round‑robin,
and split CPUs per NPU with disjoint slices for better balance.
- Improve bind_memory() robustness by deriving the target NUMA from each
NPU’s CPU pool, validating NUMA existence, and skipping binding when
data is missing.
- bind_memory() now only bind the single NUMA node that corresponds to
NPU id, instead of 2 NUMA nodes.
- Fix the issue that all NPUs bind to 0th NUMA node when DP16 due to
global NPU id is not visible across DP domain.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added/updated unit tests:

test_cpu_binding.py
1.   test_binding_mode_table covers A2 vs A3 binding mode mapping.
2. test_build_cpu_pools_fallback_to_numa_balanced covers fallback when
affinity info is missing.
3. TestBindingSwitch.test_is_arm_cpu covers ARM/x86/unknown arch
detection.
4.   test_bind_cpus_skip_non_arm covers non‑ARM skip path in bind_cpus.

test_worker_v1.py
1. Updated mocks for enable_cpu_binding default True to align with new
config default.

- vLLM version: v0.16.0
- vLLM main:
15d76f74e2

Signed-off-by: chenchuw886 <chenchuw@huawei.com>
Co-authored-by: chenchuw886 <chenchuw@huawei.com>
This commit is contained in:
Frank Chen
2026-03-01 20:30:43 +08:00
committed by GitHub
parent 5e24b26a54
commit a77fe932e4

View File

@@ -209,38 +209,75 @@ class CpuAlloc:
raise RuntimeError("lscpu command output error, no NUMA node available. Please check!")
def handle_no_affinity(self) -> None:
num_running_npu = len(self.device_info.running_npu_list)
num_numa_node = len(self.numa_to_cpu_map)
if num_numa_node == 0 or num_running_npu == 0:
"""
1) Build available NUMA nodes after allowed_cpus filtering
2) Assign NPUs to NUMA nodes by round-robin (npu_id % num_nodes)
3) Within each NUMA node, split its CPU list into per-NPU disjoint slices
"""
running = list(self.device_info.running_npu_list)
if not running or not self.numa_to_cpu_map:
return
if num_running_npu % num_numa_node != 0:
npu_num_per_node = num_running_npu // num_numa_node + 1
else:
npu_num_per_node = num_running_npu // num_numa_node
index = 0
# 1) Only keep NUMA nodes that still have CPUs after allowed_cpus filtering.
available_nodes: list[tuple[int, list[int]]] = []
for node in sorted(self.numa_to_cpu_map):
# Available CPUs on this NUMA (constrained by allowed_cpus)
cpus = [c for c in self.numa_to_cpu_map[node] if c in self.device_info.allowed_cpus]
if not cpus:
if cpus:
available_nodes.append((node, cpus))
if not available_nodes:
return
num_nodes = len(available_nodes)
# Infer "my_npu" from local rank + visible running_npu_list, assuming local rank is index into running_npu_list.
if 0 <= self.rank_id < len(running):
my_npu = running[self.rank_id]
else:
# Fallback: modulo in case rank range is larger than visible list length.
my_npu = running[self.rank_id % len(running)]
print(
f"[no_affinity_fine] rank:{self.rank_id} -> my_npu:{my_npu}; "
f"running_npu_list:{running}; num_available_nodes:{num_nodes}"
)
# 2) Round-robin assign NPUs to nodes based on NPU id (same as new logic).
# Build: node_index -> list[npu]
node_to_npus: dict[int, list[int]] = {i: [] for i in range(num_nodes)}
for npu in running:
node_index = npu % num_nodes
node_to_npus[node_index].append(npu)
# 3) Within each node, split cpus among the NPUs assigned to this node.
for node_index, npus in node_to_npus.items():
if not npus:
continue
# The actual number of NPUs to be allocated on this NUMA.
npu_num_this_node = min(npu_num_per_node, num_running_npu - index)
if npu_num_this_node <= 0:
break
# NUMA-balanced distribute the CPUs of this NUMA node among npu_num_this_node NPUs.
node_id, cpus = available_nodes[node_index]
total_cpu_num = len(cpus)
base_cpu_num = total_cpu_num // npu_num_this_node
extra_cpu_num = total_cpu_num % npu_num_this_node
start_index = 0
for i in range(npu_num_this_node):
take_cpu_num = base_cpu_num + (1 if i < extra_cpu_num else 0)
end_index = start_index + take_cpu_num
select_cpus_list = cpus[start_index:end_index]
if index < num_running_npu:
npu = self.device_info.running_npu_list[index]
self.npu_cpu_pool[npu] = select_cpus_list
index += 1
start_index = end_index
n = len(npus)
# Edge case: should not happen because we filtered cpus, but keep safe.
if total_cpu_num == 0:
continue
# If CPUs are fewer than NPUs, we can only guarantee small (possibly duplicated) slices.
if total_cpu_num < n:
for i, npu in enumerate(npus):
cpu = cpus[i % total_cpu_num]
self.npu_cpu_pool[npu] = [cpu]
continue
# Even split (disjoint slices), first 'extra' NPUs take 1 more CPU.
base = total_cpu_num // n
extra = total_cpu_num % n
start = 0
for i, npu in enumerate(npus):
take = base + (1 if i < extra else 0)
end = start + take
self.npu_cpu_pool[npu] = cpus[start:end]
start = end
DEVICE_BINDING_MODE = {
AscendDeviceType.A3: "numa_balanced",
@@ -302,15 +339,34 @@ class CpuAlloc:
logger.info(f"NPU{current_npu}: main=[{main}] acl=[{acl}] release=[{rel}]")
def bind_memory(self, pid: str, npu: int) -> None:
def _get_npu_numa_node(npu_id: int) -> int | None:
cpu_pool = self.npu_cpu_pool.get(npu_id, [])
if not cpu_pool:
return None
anchor_cpu = cpu_pool[0]
return self.cpu_node.get(anchor_cpu)
if not shutil.which("migratepages"):
logger.info("The 'migratepages' command is not available, skipping memory binding.")
return
target_numa = _get_npu_numa_node(npu)
if target_numa is None:
logger.warning(f"[migrate] rank:{self.rank_id} -> NPU{npu} has no CPU pool, skip memory binding.")
return
all_numa_nodes = sorted(self.numa_to_cpu_map.keys())
target_cpu = self.assign_acl[npu][0]
target_numa = self.cpu_node[target_cpu]
bind_numa_list = [target_numa, target_numa + 1 if target_numa % 2 == 0 else target_numa - 1]
logger.info(f"[migrate] rank:{self.rank_id} -> NUMA {bind_numa_list}")
execute_command(["migratepages", pid, ",".join(map(str, all_numa_nodes)), ",".join(map(str, bind_numa_list))])
if target_numa not in all_numa_nodes:
logger.warning(f"[migrate] NPU:{npu} -> NUMA {target_numa} not found, skip memory binding.")
return
# Bind memory to the NPU's NUMA node only to minimize cross-NUMA traffic.
logger.info(f"[migrate] NPU:{npu} -> NUMA [{target_numa}]")
execute_command(
[
"migratepages",
pid,
",".join(map(str, all_numa_nodes)),
str(target_numa),
]
)
def bind_threads(self) -> None:
thread_message, _ = execute_command(["ps", "-Te"])