Files
xc-llm-ascend/vllm_ascend/cpu_binding.py
Rozwel-dx 8d571286dd [Refactor] Modify the binding logic to allocate CPU cores for each NPU card (#5555)
[Refactor] Modify the binding logic to allocate CPU cores for each NPU
card

### What this PR does / why we need it?
Modify the binding logic to allocate CPU cores for each NPU card based
on NUMA affinity, while isolating acl_thread/release_thread and other
processes to prevent mutual interference.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?

c85cc045f8

Signed-off-by: rowzwel_dx <1392851715@qq.com>
- vLLM version: v0.13.0
- vLLM main:
7157596103

Signed-off-by: Rozwel-dx <1392851715@qq.com>
2026-01-13 09:21:28 +08:00

320 lines
13 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import subprocess
from collections import defaultdict
from typing import Dict, List, Tuple
import psutil
from vllm.logger import logger
ALLOWED_CPUS_PATH = "/proc/self/status"
ASCEND_RT_VISIBLE_DEVICES = os.getenv("ASCEND_RT_VISIBLE_DEVICES")
def execute_command(cmd: List[str]) -> Tuple[str, int]:
with subprocess.Popen(cmd,
shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE) as p:
out, _ = p.communicate(timeout=1000)
return out.decode(), p.returncode
class DeviceInfo:
def __init__(self):
self.npu_map_info: Dict[str, Dict[str, str]] = self.get_npu_map_info()
self.allowed_cpus: List[int] = self.parse_allowed_cpus()
self.running_npu_list: List[int] = self.get_running_npus()
self.npu_affinity: Dict[int, List[int]] = self.parse_topo_affinity()
@staticmethod
def expand_cpu_list(allowed_list_str: str) -> List[int]:
allowed_cpus_list: List[int] = []
for per_range in allowed_list_str.split(","):
if "-" in per_range:
start_cpu, end_cpu = map(int, per_range.split("-"))
allowed_cpus_list.extend(range(start_cpu, end_cpu + 1))
else:
allowed_cpus_list.append(int(per_range))
return allowed_cpus_list
@staticmethod
def get_npu_map_info() -> Dict[str, Dict[str, str]]:
npu_map_info: Dict[str, Dict[str, str]] = {}
npu_info, _ = execute_command(["npu-smi", "info", "-m"])
npu_map = npu_info.strip().split("\n")[1:]
for line in npu_map:
npu_id, chip_id, chip_logic_id = line.strip().split()[:3]
if not chip_logic_id.isdigit():
continue
if npu_id not in npu_map_info:
npu_map_info[npu_id] = {}
npu_map_info[npu_id][chip_id] = chip_logic_id
return npu_map_info
def get_running_npus(self) -> List[int]:
npu_message, _ = execute_command(["npu-smi", "info"])
in_proc_section = False
running_npu_set = set()
for line in npu_message.splitlines():
line = line.strip()
if line.startswith("| NPU") and "Process id" in line:
in_proc_section = True
continue
if not in_proc_section:
continue
if line.startswith("| "):
parts = [p.strip() for p in line.strip("|").split("|")]
if len(parts) < 2:
continue
npu_id = parts[0].split()[0]
chip_id = parts[0].split()[1]
if not npu_id.isdigit() or not chip_id.isdigit():
continue
chip_logic_id = self.npu_map_info.get(npu_id, {}).get(chip_id)
if not chip_logic_id or not chip_logic_id.isdigit():
raise RuntimeError(
"Failed to get correct chip_logic_id from command 'npu-smi info -m'."
)
running_npu_set.add(int(chip_logic_id))
if ASCEND_RT_VISIBLE_DEVICES:
devices_str = ASCEND_RT_VISIBLE_DEVICES
devices_list = [int(x) for x in devices_str.split(",")]
running_npu_set = set(devices_list) & running_npu_set
if not running_npu_set:
raise RuntimeError(
"Can not get running npu info, you can use BIND_CPU=0 to skip."
)
return sorted(running_npu_set)
def parse_allowed_cpus(self) -> List[int]:
if not os.path.exists(ALLOWED_CPUS_PATH):
return []
with open(ALLOWED_CPUS_PATH) as f:
for line in f:
if line.startswith("Cpus_allowed_list"):
return self.expand_cpu_list(line.split()[1])
raise RuntimeError(
"Can not found specific 'Cpus_allowed_list' in the '/proc/self/status' file."
)
def parse_topo_affinity(self) -> Dict[int, List[int]]:
chip_logic_id = 0
affinity: Dict[int, List[int]] = {}
affinity_message, _ = execute_command(
["npu-smi", "info", "-t", "topo"])
for line in affinity_message.splitlines():
if line.startswith("NPU"):
parts = line.split()
last_part = parts[-1]
if last_part != "Affinity":
affinity[chip_logic_id] = self.expand_cpu_list(last_part)
chip_logic_id += 1
return affinity
class CpuAlloc:
def __init__(self, rank_id: int):
self.rank_id = rank_id
self.device_info: DeviceInfo = DeviceInfo()
self.cpu_node: Dict[int, int] = {}
self.numa_to_cpu_map: Dict[int, List[int]] = defaultdict(list)
self.npu_cpu_pool: Dict[int, List[int]] = {}
self.assign_main: Dict[int, List[int]] = {}
self.assign_acl: Dict[int, List[int]] = {}
self.assign_rel: Dict[int, List[int]] = {}
@staticmethod
def get_threads_map(
thread_message: str) -> Dict[str, Dict[str, List[str]]]:
threads_map: Dict[str, Dict[str, List[str]]] = {}
for line in thread_message.splitlines():
parts = line.split()
if len(parts) < 2:
continue
main_pid, sub_pid = parts[0], parts[1]
if "acl_thread" in line:
key = "acl_thread"
elif "release_thread" in line:
key = "release_thread"
else:
continue
if main_pid not in threads_map:
threads_map[main_pid] = {
"acl_thread": [],
"release_thread": []
}
threads_map[main_pid][key].append(sub_pid)
return threads_map
@staticmethod
def bind(pid: str, cpus: List[int], bind_sub_thread: bool) -> None:
if cpus:
cpu_list = ",".join(map(str, cpus))
if bind_sub_thread:
bind_result, return_code = execute_command(
["taskset", "-acp", cpu_list, pid])
else:
bind_result, return_code = execute_command(
["taskset", "-cp", cpu_list, pid])
if return_code != 0:
raise RuntimeError(f"Failed to bind {pid} to CPU {cpu_list}.")
def average_distribute(
self, groups: Dict[str, List[int]]) -> Dict[int, List[int]]:
result: Dict[int, List[int]] = {}
for key, npu_list in groups.items():
cpu_list = sorted(self.npu_cpu_pool[npu_list[0]])
cpu_num_per_npu = len(cpu_list) // len(npu_list)
for i, npu in enumerate(npu_list):
start_index = i * cpu_num_per_npu
end_index = (i + 1) * cpu_num_per_npu if i < len(
npu_list) - 1 else len(cpu_list)
result[npu] = cpu_list[start_index:end_index]
return result
def extend_numa(self, cpu_list: List[int]) -> List[int]:
if not cpu_list:
return []
nodes = {self.cpu_node[c] for c in cpu_list}
if len(nodes) != 1:
return cpu_list
node = list(nodes)[0]
next_node = (node + 1) % len(self.numa_to_cpu_map)
extended = cpu_list[:]
for cpu in self.numa_to_cpu_map[next_node]:
if cpu in self.device_info.allowed_cpus:
extended.append(cpu)
return sorted(set(extended))
def build_cpu_node_map(self) -> None:
cpu_numa_map, _ = execute_command(["lscpu", "-e=CPU,NODE"])
for line in cpu_numa_map.splitlines():
line = line.strip()
if not line or not line[0].isdigit():
continue
cpu_str, node_str = line.split()
cpu = int(cpu_str)
node = int(node_str)
self.cpu_node[cpu] = node
self.numa_to_cpu_map[node].append(cpu)
if len(self.numa_to_cpu_map) == 0:
raise RuntimeError(
"lscpu command output error, no NUMA node available. Please check!"
)
def handle_no_affinity(self) -> None:
num_running_npu = len(self.device_info.running_npu_list)
num_numa_node = len(self.numa_to_cpu_map)
if num_numa_node == 0 or num_running_npu == 0:
return
if num_running_npu % num_numa_node != 0:
npu_num_per_node = num_running_npu // num_numa_node + 1
else:
npu_num_per_node = num_running_npu // num_numa_node
index = 0
for node in sorted(self.numa_to_cpu_map):
# Available CPUs on this NUMA (constrained by allowed_cpus)
cpus = [
c for c in self.numa_to_cpu_map[node]
if c in self.device_info.allowed_cpus
]
if not cpus:
continue
# The actual number of NPUs to be allocated on this NUMA.
npu_num_this_node = min(npu_num_per_node, num_running_npu - index)
if npu_num_this_node <= 0:
break
# Evenly distribute the CPUs of this NUMA node among npu_num_this_node NPUs.
total_cpu_num = len(cpus)
base_cpu_num = total_cpu_num // npu_num_this_node
extra_cpu_num = total_cpu_num % npu_num_this_node
start_index = 0
for i in range(npu_num_this_node):
take_cpu_num = base_cpu_num + (1 if i < extra_cpu_num else 0)
end_index = start_index + take_cpu_num
select_cpus_list = cpus[start_index:end_index]
if index < num_running_npu:
npu = self.device_info.running_npu_list[index]
self.npu_cpu_pool[npu] = select_cpus_list
index += 1
start_index = end_index
def build_cpu_pools(self) -> None:
self.build_cpu_node_map()
if not self.device_info.npu_affinity:
self.handle_no_affinity()
return
for npu in self.device_info.running_npu_list:
base_cpu_list = [
cpu for cpu in self.device_info.npu_affinity.get(npu, [])
if cpu in self.device_info.allowed_cpus
]
if not base_cpu_list:
raise RuntimeError(
"CPUs available in 'Cpus_allowed_list' conflict with NUMA affinity."
)
extra_cpu_list = self.extend_numa(base_cpu_list)
self.npu_cpu_pool[npu] = extra_cpu_list
groups = defaultdict(list)
for npu, cpus in self.npu_cpu_pool.items():
groups[str(cpus)].append(npu)
final: Dict[int, List[int]] = {}
for key, npu_list in groups.items():
if len(npu_list) == 1:
final[npu_list[0]] = self.npu_cpu_pool[npu_list[0]]
else:
final.update(self.average_distribute({key: npu_list}))
self.npu_cpu_pool = final
def allocate(self) -> None:
for npu, pool in self.npu_cpu_pool.items():
if len(pool) >= 3:
main = pool[:-2]
acl = [pool[-2]]
rel = [pool[-1]]
else:
raise RuntimeError(
"The number of CPUs is insufficient to bind to the NPUs. "
"Each NPU requires at least 3 CPUs.")
self.assign_main[npu] = main
self.assign_acl[npu] = acl
self.assign_rel[npu] = rel
def print_plan(self) -> None:
logger.info("The CPU allocation plan is as follows:")
current_npu = self.device_info.running_npu_list[self.rank_id]
main = " ".join(map(str, self.assign_main[current_npu]))
acl = " ".join(map(str, self.assign_acl[current_npu]))
rel = str(self.assign_rel[current_npu]
) if self.assign_rel[current_npu] else ""
logger.info(
f"NPU{current_npu}: main=[{main}] acl=[{acl}] release=[{rel}]")
def bind_threads(self) -> None:
thread_message, _ = execute_command(["ps", "-Te"])
threads_map = self.get_threads_map(thread_message)
main_pid = str(psutil.Process().pid)
current_npu = self.device_info.running_npu_list[self.rank_id]
self.bind(main_pid, self.assign_main[current_npu], True)
for acl_thread in threads_map.get(main_pid, {}).get("acl_thread", []):
self.bind(acl_thread, self.assign_acl[current_npu], False)
for release_thread in threads_map.get(main_pid,
{}).get("release_thread", []):
self.bind(release_thread, self.assign_rel[current_npu], False)
def run_all(self) -> None:
self.build_cpu_pools()
self.allocate()
self.print_plan()
self.bind_threads()
def bind_cpus(rank_id: int) -> None:
binder = CpuAlloc(rank_id)
binder.run_all()