[Refactor] Modify the binding logic to allocate CPU cores for each NPU card (#5555)

[Refactor] Modify the binding logic to allocate CPU cores for each NPU
card

### What this PR does / why we need it?
Modify the binding logic to allocate CPU cores for each NPU card based
on NUMA affinity, while isolating acl_thread/release_thread and other
processes to prevent mutual interference.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?

c85cc045f8

Signed-off-by: rowzwel_dx <1392851715@qq.com>
- vLLM version: v0.13.0
- vLLM main:
7157596103

Signed-off-by: Rozwel-dx <1392851715@qq.com>
This commit is contained in:
Rozwel-dx
2026-01-13 09:21:28 +08:00
committed by GitHub
parent d886b81971
commit 8d571286dd
3 changed files with 470 additions and 316 deletions

View File

@@ -0,0 +1,167 @@
import unittest
from unittest.mock import patch
from vllm_ascend.cpu_binding import CpuAlloc, DeviceInfo
class TestDeviceInfo(unittest.TestCase):
@patch('vllm_ascend.cpu_binding.execute_command')
def setUp(self, mock_execute_command):
mock_execute_command.side_effect = [
("NPU ID Chip ID Chip Logic ID Chip Name\n0 0 0 Ascend\n0 1 - Mcu\n1 0 1 Ascend",
0),
("| NPU Chip | Process id |\n| 0 0 | 1234 | vllm | 56000 |\n| 1 0 | 1235 | vllm | 56000 |",
0), ("", 0)
]
self.device_info = DeviceInfo()
@patch('vllm_ascend.cpu_binding.execute_command')
def test_get_npu_map_info(self, mock_execute_command):
execute_result_list = [
("NPU ID Chip ID Chip Logic ID Chip Phy-ID Chip Name\n0 0 0 0 Ascend\n0 1 1 1 Ascend\n0 2 - - Mcu",
0),
("NPU ID Chip ID Chip Logic ID Chip Name\n8 0 0 Ascend\n8 1 - Mcu\n9 0 1 Ascend",
0),
]
result_list = [{
'0': {
'0': '0',
'1': '1'
}
}, {
'8': {
'0': '0'
},
'9': {
'0': '1'
}
}]
for result in execute_result_list:
mock_execute_command.return_value = result
npu_map_info = self.device_info.get_npu_map_info()
expected = result_list.pop(0)
self.assertEqual(npu_map_info, expected)
@patch('vllm_ascend.cpu_binding.execute_command')
def test_get_running_npus(self, mock_execute_command):
mock_execute_command.side_effect = [
("| NPU Chip | Process id |\n| 0 1 | 1236 | vllm | 56000 |", 0),
("", 0),
("| NPU Chip | Process id |\n| 1 0 | 1236 | vllm | 56000 |", 0)
]
with self.assertRaises(RuntimeError):
self.device_info.get_running_npus()
with self.assertRaises(RuntimeError):
self.device_info.get_running_npus()
running_npus = self.device_info.get_running_npus()
self.assertEqual(len(running_npus), 1)
@patch('vllm_ascend.cpu_binding.execute_command')
def test_parse_topo_affinity(self, mock_execute_command):
mock_execute_command.return_value = (
"NPU0 X HCCS HCCS HCCS HCCS HCCS HCCS HCCS 0-3", 0)
affinity = self.device_info.parse_topo_affinity()
expected = {0: [0, 1, 2, 3]}
self.assertEqual(affinity, expected)
def test_expand_cpu_list(self):
result = self.device_info.expand_cpu_list("0-2, 4, 6-8")
self.assertEqual(result, [0, 1, 2, 4, 6, 7, 8])
class TestCpuAlloc(unittest.TestCase):
@patch('vllm_ascend.cpu_binding.execute_command')
def setUp(self, mock_execute_command):
mock_execute_command.side_effect = [
("NPU ID Chip ID Chip Logic ID Chip Name\n0 0 0 Ascend\n0 1 - Mcu\n1 0 1 Ascend",
0),
("| NPU Chip | Process id |\n| 0 0 | 1234 | vllm | 56000 |\n| 1 0 | 1235 | vllm | 56000 |",
0), ("", 0)
]
self.cpu_alloc = CpuAlloc(0)
def test_average_distribute(self):
self.cpu_alloc.npu_cpu_pool = {
0: [10, 11, 12, 13],
1: [10, 11, 12, 13]
}
groups = {"[10, 11, 12, 13]": [0, 1]}
result = self.cpu_alloc.average_distribute(groups)
self.assertEqual(result, {0: [10, 11], 1: [12, 13]})
self.cpu_alloc.npu_cpu_pool = {
0: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13],
1: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13],
2: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13]
}
groups = {"[0, 1, 2, 3, 4, 5]": [0, 1, 2]}
result = self.cpu_alloc.average_distribute(groups)
self.assertEqual(result, {
0: [0, 1, 2, 3],
1: [4, 5, 6, 7],
2: [8, 9, 10, 11, 12, 13]
})
def test_extend_numa(self):
result = self.cpu_alloc.extend_numa([])
self.assertEqual(result, [])
self.cpu_alloc.cpu_node = {0: 0, 1: 0, 2: 1, 3: 1}
self.cpu_alloc.numa_to_cpu_map = {0: [0, 1], 1: [2, 3]}
self.cpu_alloc.device_info.allowed_cpus = [0, 1, 2, 3]
result = self.cpu_alloc.extend_numa([0, 1])
self.assertEqual(result, [0, 1, 2, 3])
self.cpu_alloc.device_info.allowed_cpus = [0, 1, 3]
result = self.cpu_alloc.extend_numa([0, 1])
self.assertEqual(result, [0, 1, 3])
@patch('vllm_ascend.cpu_binding.execute_command')
def test_build_cpu_node_map(self, mock_execute_command):
mock_execute_command.return_value = ("", 0)
with self.assertRaises(RuntimeError):
self.cpu_alloc.build_cpu_node_map()
mock_execute_command.return_value = ("0 0\n1 1\n2 0\n3 1", 0)
self.cpu_alloc.build_cpu_node_map()
expected_cpu_node = {0: 0, 1: 1, 2: 0, 3: 1}
expected_numa_to_cpu_map = {0: [0, 2], 1: [1, 3]}
self.assertEqual(self.cpu_alloc.cpu_node, expected_cpu_node)
self.assertEqual(self.cpu_alloc.numa_to_cpu_map,
expected_numa_to_cpu_map)
@patch('vllm_ascend.cpu_binding.execute_command')
def test_handle_no_affinity(self, mock_execute_command):
mock_execute_command.side_effect = [("0 0\n1 1", 0), ("0 0\n1 1", 0)]
self.cpu_alloc.device_info.running_npu_list = [0, 1]
self.cpu_alloc.device_info.allowed_cpus = [0, 1, 2, 3]
self.cpu_alloc.device_info.affinity = {}
self.assertEqual(self.cpu_alloc.npu_cpu_pool, {})
self.cpu_alloc.device_info.affinity = {0: [0, 1], 1: [2, 3]}
self.cpu_alloc.build_cpu_pools()
self.assertEqual(len(self.cpu_alloc.npu_cpu_pool), 2)
@patch('vllm_ascend.cpu_binding.execute_command')
def test_allocate(self, mock_execute_command):
self.cpu_alloc.device_info.running_npu_list = [0]
self.cpu_alloc.npu_cpu_pool = {0: [0, 1, 2]}
self.cpu_alloc.allocate()
self.assertEqual(self.cpu_alloc.assign_main[0], [0])
self.assertEqual(self.cpu_alloc.assign_acl[0], [1])
self.assertEqual(self.cpu_alloc.assign_rel[0], [2])
self.cpu_alloc.npu_cpu_pool = {0: [0, 1]}
with self.assertRaises(RuntimeError):
self.cpu_alloc.allocate()
@patch('vllm_ascend.cpu_binding.execute_command')
def test_bind_threads(self, mock_execute_command):
thread_message = "1234 1234 ? 00:00:03 acl_thread\n4567 4567 ? 00:00:03 release_thread"
mock_execute_command.return_value = (thread_message, 0)
self.cpu_alloc.device_info.running_npu_list = [0]
self.cpu_alloc.assign_main = {0: [0, 1]}
self.cpu_alloc.assign_acl = {0: [2]}
self.cpu_alloc.assign_rel = {0: [3]}
self.cpu_alloc.bind_threads()
mock_execute_command.assert_called()
if __name__ == '__main__':
unittest.main()

View File

@@ -1,330 +1,319 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import subprocess
from dataclasses import dataclass
from itertools import accumulate
from typing import Dict, List, Optional, Tuple, Union
from collections import defaultdict
from typing import Dict, List, Tuple
import psutil
import torch_npu
from vllm.logger import logger
ALLOWED_CPUS_PATH = "/proc/self/status"
ASCEND_RT_VISIBLE_DEVICES = os.getenv("ASCEND_RT_VISIBLE_DEVICES")
CPU_BINDING_NUM = os.getenv("CPU_BINDING_NUM")
def execute_command(cmd_list):
with subprocess.Popen(cmd_list,
def execute_command(cmd: List[str]) -> Tuple[str, int]:
with subprocess.Popen(cmd,
shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE) as p:
out, err = p.communicate(timeout=1000)
res = out.decode()
return res
out, _ = p.communicate(timeout=1000)
return out.decode(), p.returncode
@dataclass
class DeviceInfo:
"""
Parse a single line of device information into structured data.
"""
_info_line: str = ""
npu_id: int = 0
chip_id: int = 0
chip_logic_id: Union[int, str] = 0
chip_name: str = ""
def __post_init__(self):
npu_id_str, chip_id_str, chip_logic_id_str, self.chip_name = self._info_line.strip(
).split(None, 3)
self.npu_id = int(npu_id_str)
self.chip_id = int(chip_id_str)
if chip_logic_id_str.isnumeric():
self.chip_logic_id = int(chip_logic_id_str)
def __init__(self):
self.npu_map_info: Dict[str, Dict[str, str]] = self.get_npu_map_info()
self.allowed_cpus: List[int] = self.parse_allowed_cpus()
self.running_npu_list: List[int] = self.get_running_npus()
self.npu_affinity: Dict[int, List[int]] = self.parse_topo_affinity()
@staticmethod
def expand_cpu_list(allowed_list_str: str) -> List[int]:
allowed_cpus_list: List[int] = []
for per_range in allowed_list_str.split(","):
if "-" in per_range:
start_cpu, end_cpu = map(int, per_range.split("-"))
allowed_cpus_list.extend(range(start_cpu, end_cpu + 1))
else:
allowed_cpus_list.append(int(per_range))
return allowed_cpus_list
class NpuHbmInfo:
visible_npu_ids: Optional[List[int]] = None
hbm_capacity: Optional[int] = None
hbm_usage: Optional[int] = None
@staticmethod
def get_npu_map_info() -> Dict[str, Dict[str, str]]:
npu_map_info: Dict[str, Dict[str, str]] = {}
npu_info, _ = execute_command(["npu-smi", "info", "-m"])
npu_map = npu_info.strip().split("\n")[1:]
for line in npu_map:
npu_id, chip_id, chip_logic_id = line.strip().split()[:3]
if not chip_logic_id.isdigit():
continue
if npu_id not in npu_map_info:
npu_map_info[npu_id] = {}
npu_map_info[npu_id][chip_id] = chip_logic_id
return npu_map_info
@classmethod
def set_visible_devices(cls, world_size):
"""
Determine which NPUs are visible to the current process and cache their
logical NPU IDs in `cls.visible_npu_ids`.
"""
if cls.visible_npu_ids:
return
if ASCEND_RT_VISIBLE_DEVICES is None:
devices = sorted(list(_get_device_map_info().keys()))
else:
def get_running_npus(self) -> List[int]:
npu_message, _ = execute_command(["npu-smi", "info"])
in_proc_section = False
running_npu_set = set()
for line in npu_message.splitlines():
line = line.strip()
if line.startswith("| NPU") and "Process id" in line:
in_proc_section = True
continue
if not in_proc_section:
continue
if line.startswith("| "):
parts = [p.strip() for p in line.strip("|").split("|")]
if len(parts) < 2:
continue
npu_id = parts[0].split()[0]
chip_id = parts[0].split()[1]
if not npu_id.isdigit() or not chip_id.isdigit():
continue
chip_logic_id = self.npu_map_info.get(npu_id, {}).get(chip_id)
if not chip_logic_id or not chip_logic_id.isdigit():
raise RuntimeError(
"Failed to get correct chip_logic_id from command 'npu-smi info -m'."
)
running_npu_set.add(int(chip_logic_id))
if ASCEND_RT_VISIBLE_DEVICES:
devices_str = ASCEND_RT_VISIBLE_DEVICES
devices = [int(x) for x in devices_str.split(",")]
device_map_info = _get_device_map_info()
npu_ids = []
for device in devices:
device_info = device_map_info.get(device)
if device_info is None:
devices_list = [int(x) for x in devices_str.split(",")]
running_npu_set = set(devices_list) & running_npu_set
if not running_npu_set:
raise RuntimeError(
"Can not get running npu info, you can use BIND_CPU=0 to skip."
)
return sorted(running_npu_set)
def parse_allowed_cpus(self) -> List[int]:
if not os.path.exists(ALLOWED_CPUS_PATH):
return []
with open(ALLOWED_CPUS_PATH) as f:
for line in f:
if line.startswith("Cpus_allowed_list"):
return self.expand_cpu_list(line.split()[1])
raise RuntimeError(
"Can not found specific 'Cpus_allowed_list' in the '/proc/self/status' file."
)
def parse_topo_affinity(self) -> Dict[int, List[int]]:
chip_logic_id = 0
affinity: Dict[int, List[int]] = {}
affinity_message, _ = execute_command(
["npu-smi", "info", "-t", "topo"])
for line in affinity_message.splitlines():
if line.startswith("NPU"):
parts = line.split()
last_part = parts[-1]
if last_part != "Affinity":
affinity[chip_logic_id] = self.expand_cpu_list(last_part)
chip_logic_id += 1
return affinity
class CpuAlloc:
def __init__(self, rank_id: int):
self.rank_id = rank_id
self.device_info: DeviceInfo = DeviceInfo()
self.cpu_node: Dict[int, int] = {}
self.numa_to_cpu_map: Dict[int, List[int]] = defaultdict(list)
self.npu_cpu_pool: Dict[int, List[int]] = {}
self.assign_main: Dict[int, List[int]] = {}
self.assign_acl: Dict[int, List[int]] = {}
self.assign_rel: Dict[int, List[int]] = {}
@staticmethod
def get_threads_map(
thread_message: str) -> Dict[str, Dict[str, List[str]]]:
threads_map: Dict[str, Dict[str, List[str]]] = {}
for line in thread_message.splitlines():
parts = line.split()
if len(parts) < 2:
continue
main_pid, sub_pid = parts[0], parts[1]
if "acl_thread" in line:
key = "acl_thread"
elif "release_thread" in line:
key = "release_thread"
else:
continue
if main_pid not in threads_map:
threads_map[main_pid] = {
"acl_thread": [],
"release_thread": []
}
threads_map[main_pid][key].append(sub_pid)
return threads_map
@staticmethod
def bind(pid: str, cpus: List[int], bind_sub_thread: bool) -> None:
if cpus:
cpu_list = ",".join(map(str, cpus))
if bind_sub_thread:
bind_result, return_code = execute_command(
["taskset", "-acp", cpu_list, pid])
else:
bind_result, return_code = execute_command(
["taskset", "-cp", cpu_list, pid])
if return_code != 0:
raise RuntimeError(f"Failed to bind {pid} to CPU {cpu_list}.")
def average_distribute(
self, groups: Dict[str, List[int]]) -> Dict[int, List[int]]:
result: Dict[int, List[int]] = {}
for key, npu_list in groups.items():
cpu_list = sorted(self.npu_cpu_pool[npu_list[0]])
cpu_num_per_npu = len(cpu_list) // len(npu_list)
for i, npu in enumerate(npu_list):
start_index = i * cpu_num_per_npu
end_index = (i + 1) * cpu_num_per_npu if i < len(
npu_list) - 1 else len(cpu_list)
result[npu] = cpu_list[start_index:end_index]
return result
def extend_numa(self, cpu_list: List[int]) -> List[int]:
if not cpu_list:
return []
nodes = {self.cpu_node[c] for c in cpu_list}
if len(nodes) != 1:
return cpu_list
node = list(nodes)[0]
next_node = (node + 1) % len(self.numa_to_cpu_map)
extended = cpu_list[:]
for cpu in self.numa_to_cpu_map[next_node]:
if cpu in self.device_info.allowed_cpus:
extended.append(cpu)
return sorted(set(extended))
def build_cpu_node_map(self) -> None:
cpu_numa_map, _ = execute_command(["lscpu", "-e=CPU,NODE"])
for line in cpu_numa_map.splitlines():
line = line.strip()
if not line or not line[0].isdigit():
continue
cpu_str, node_str = line.split()
cpu = int(cpu_str)
node = int(node_str)
self.cpu_node[cpu] = node
self.numa_to_cpu_map[node].append(cpu)
if len(self.numa_to_cpu_map) == 0:
raise RuntimeError(
"lscpu command output error, no NUMA node available. Please check!"
)
def handle_no_affinity(self) -> None:
num_running_npu = len(self.device_info.running_npu_list)
num_numa_node = len(self.numa_to_cpu_map)
if num_numa_node == 0 or num_running_npu == 0:
return
if num_running_npu % num_numa_node != 0:
npu_num_per_node = num_running_npu // num_numa_node + 1
else:
npu_num_per_node = num_running_npu // num_numa_node
index = 0
for node in sorted(self.numa_to_cpu_map):
# Available CPUs on this NUMA (constrained by allowed_cpus)
cpus = [
c for c in self.numa_to_cpu_map[node]
if c in self.device_info.allowed_cpus
]
if not cpus:
continue
# The actual number of NPUs to be allocated on this NUMA.
npu_num_this_node = min(npu_num_per_node, num_running_npu - index)
if npu_num_this_node <= 0:
break
# Evenly distribute the CPUs of this NUMA node among npu_num_this_node NPUs.
total_cpu_num = len(cpus)
base_cpu_num = total_cpu_num // npu_num_this_node
extra_cpu_num = total_cpu_num % npu_num_this_node
start_index = 0
for i in range(npu_num_this_node):
take_cpu_num = base_cpu_num + (1 if i < extra_cpu_num else 0)
end_index = start_index + take_cpu_num
select_cpus_list = cpus[start_index:end_index]
if index < num_running_npu:
npu = self.device_info.running_npu_list[index]
self.npu_cpu_pool[npu] = select_cpus_list
index += 1
start_index = end_index
def build_cpu_pools(self) -> None:
self.build_cpu_node_map()
if not self.device_info.npu_affinity:
self.handle_no_affinity()
return
for npu in self.device_info.running_npu_list:
base_cpu_list = [
cpu for cpu in self.device_info.npu_affinity.get(npu, [])
if cpu in self.device_info.allowed_cpus
]
if not base_cpu_list:
raise RuntimeError(
f"Device {device} not found in device_map_info")
npu_ids.append(device_info.npu_id)
cls.visible_npu_ids = npu_ids
"CPUs available in 'Cpus_allowed_list' conflict with NUMA affinity."
)
extra_cpu_list = self.extend_numa(base_cpu_list)
self.npu_cpu_pool[npu] = extra_cpu_list
groups = defaultdict(list)
for npu, cpus in self.npu_cpu_pool.items():
groups[str(cpus)].append(npu)
final: Dict[int, List[int]] = {}
for key, npu_list in groups.items():
if len(npu_list) == 1:
final[npu_list[0]] = self.npu_cpu_pool[npu_list[0]]
else:
final.update(self.average_distribute({key: npu_list}))
self.npu_cpu_pool = final
@classmethod
def get_hbm_capacity(cls, rank, world_size, need_nz):
"""
Query and cache the HBM (or DDR) capacity in **bytes** for the NPU assigned
to the current process.
"""
soc_version = torch_npu._C._npu_get_soc_version()
if cls.hbm_capacity:
return cls.hbm_capacity
if not cls.visible_npu_ids:
cls.set_visible_devices(world_size)
assert cls.visible_npu_ids is not None
npu_id = cls.visible_npu_ids[rank]
memory_info = execute_command(
["npu-smi", "info", "-i", f"{npu_id}", "-t",
"memory"]).split("\n")[1:]
if soc_version == 240:
hbm_capacity_key = 'Capacity(MB)'
elif not need_nz:
hbm_capacity_key = 'HBM Capacity(MB)'
else:
hbm_capacity_key = 'DDR Capacity(MB)'
for line in memory_info:
try:
key, value = line.strip().split(':', 2)
if key.strip() == hbm_capacity_key:
cls.hbm_capacity = int(value.strip()) * 1024 * 1024
return cls.hbm_capacity
except ValueError:
pass
raise ValueError('not found valid hbm capactiy')
def allocate(self) -> None:
for npu, pool in self.npu_cpu_pool.items():
if len(pool) >= 3:
main = pool[:-2]
acl = [pool[-2]]
rel = [pool[-1]]
else:
raise RuntimeError(
"The number of CPUs is insufficient to bind to the NPUs. "
"Each NPU requires at least 3 CPUs.")
self.assign_main[npu] = main
self.assign_acl[npu] = acl
self.assign_rel[npu] = rel
@classmethod
def get_hbm_usage(cls, rank, world_size, need_nz):
"""
Return the current HBM or DDR usage
ratio (0-1) for the NPU assigned to the given rank.
"""
if cls.hbm_usage:
return cls.hbm_usage
if not cls.visible_npu_ids:
cls.set_visible_devices(world_size)
assert cls.visible_npu_ids is not None
npu_id = cls.visible_npu_ids[rank]
usage_info = execute_command(
["npu-smi", "info", "-i", f"{npu_id}", "-t",
"usages"]).split("\n")[1:]
soc_version = torch_npu._C._npu_get_soc_version()
if soc_version == 240:
hbm_capacity_key = 'Memory Usage Rate(%)'
elif not need_nz:
hbm_capacity_key = 'HBM Usage Rate(%)'
else:
hbm_capacity_key = 'DDR Usage Rate(%)'
for line in usage_info:
try:
key, value = line.strip().split(':', 2)
if key.strip() == hbm_capacity_key:
hbm_usage = (float(value.strip()) + 1) / 100
return hbm_usage
except ValueError:
pass
raise ValueError('not found valid hbm usage')
def print_plan(self) -> None:
logger.info("The CPU allocation plan is as follows:")
current_npu = self.device_info.running_npu_list[self.rank_id]
main = " ".join(map(str, self.assign_main[current_npu]))
acl = " ".join(map(str, self.assign_acl[current_npu]))
rel = str(self.assign_rel[current_npu]
) if self.assign_rel[current_npu] else ""
logger.info(
f"NPU{current_npu}: main=[{main}] acl=[{acl}] release=[{rel}]")
def bind_threads(self) -> None:
thread_message, _ = execute_command(["ps", "-Te"])
threads_map = self.get_threads_map(thread_message)
main_pid = str(psutil.Process().pid)
current_npu = self.device_info.running_npu_list[self.rank_id]
self.bind(main_pid, self.assign_main[current_npu], True)
for acl_thread in threads_map.get(main_pid, {}).get("acl_thread", []):
self.bind(acl_thread, self.assign_acl[current_npu], False)
for release_thread in threads_map.get(main_pid,
{}).get("release_thread", []):
self.bind(release_thread, self.assign_rel[current_npu], False)
def run_all(self) -> None:
self.build_cpu_pools()
self.allocate()
self.print_plan()
self.bind_threads()
def _get_device_map_info() -> Dict[int, DeviceInfo]:
"""
Build and return a mapping from logical chip ID (int) to its DeviceInfo object.
"""
device_map_info = {}
device_map = execute_command(["npu-smi", "info",
"-m"]).strip().split("\n")[1:]
for line in device_map:
device_info = DeviceInfo(line.strip())
if isinstance(device_info.chip_logic_id, int):
device_map_info[device_info.chip_logic_id] = device_info
return device_map_info
def _get_pcie_info(devices: List[int], keyword="PCIeBusInfo"):
"""
Query each NPU in the given device list and return a mapping
from logical device ID to its PCIe bus address.
"""
device_map_info = _get_device_map_info()
device_pcie_tbl = {}
for device in devices:
device_info = device_map_info.get(device)
if not device_info:
raise RuntimeError(
"Can not get device info, you can use BIND_CPU=0 to skip.")
pcie_info = execute_command([
"npu-smi", "info", "-t", "board", "-i", f"{device_info.npu_id}",
"-c", f"{device_info.chip_id}"
]).strip().split("\n")
for _ in pcie_info:
line = ''.join(_.split())
if line.startswith(keyword):
device_pcie_tbl[device] = line[len(keyword) + 1:]
break
return device_pcie_tbl
def _get_numa_info(pcie_tbl, keyword="NUMAnode"):
"""
Build two mappings: device → NUMA node, and NUMA node → [devices].
"""
device_numa_tbl: Dict[int, int] = {} # device id -> numa id
numa_devices_tbl: Dict[int, List[int]] = {} # numa id -> device ids
for device, pcie_no in pcie_tbl.items():
numa_info = execute_command(["lspci", "-s", f"{pcie_no}",
"-vvv"]).split("\n")
for _ in numa_info:
line = ''.join(_.split())
if line.startswith(keyword):
numa_id = int(line[len(keyword) + 1:])
device_numa_tbl[device] = numa_id
devices = numa_devices_tbl.get(numa_id, None)
if devices is None:
numa_devices_tbl[numa_id] = list()
numa_devices_tbl[numa_id].append(device)
break
return device_numa_tbl, numa_devices_tbl
def _get_numa_info_v2(
devices: List[int],
keyword="NUMAnode(s)") -> Tuple[Dict[int, int], Dict[int, List[int]]]:
"""
Evenly distribute the given device list across all NUMA nodes and return
both device-to-numa and numa-to-devices mappings.
"""
numa_nodes = 1
numa_info = execute_command(["lscpu"]).split("\n")
for _ in numa_info:
line = ''.join(_.split())
if keyword not in line:
continue
numa_nodes = int(line[-1])
break
device_per_numa, tail_device = divmod(len(devices), numa_nodes)
device_count_per_numa_list = [
device_per_numa + (i < tail_device) for i in range(numa_nodes)
]
ends = list(accumulate(device_count_per_numa_list))
starts = [0] + ends[:-1]
numa_devices_tbl = {
ind: devices[start:end]
for ind, (start, end) in enumerate(zip(starts, ends))
}
device_numa_tbl = {
device: numa
for numa, _devices in numa_devices_tbl.items()
for device in _devices
}
return device_numa_tbl, numa_devices_tbl
def _get_cpu_info(numa_ids, keyword1="NUMAnode", keyword2="CPU(s)"):
"""
Parse lscpu output to build a dict that maps each NUMA
node ID to the list of CPU core IDs belonging to it.
"""
cpu_idx_tbl = dict()
numa_keywords = [keyword1 + str(idx) + keyword2 for idx in numa_ids]
cpu_info = execute_command(["lscpu"]).split("\n")
for _ in cpu_info:
line = ''.join(_.split())
if any(line.startswith(word) for word in numa_keywords):
split_info = line.split(":")
cpu_id_ranges = split_info[-1].split(",")
ranges = list()
for range_str in cpu_id_ranges:
endpoints = range_str.split("-")
if len(endpoints) != 2:
raise Exception(
"lscpu command output error, please check !")
ranges += [
cid for cid in range(int(endpoints[0]),
int(endpoints[1]) + 1)
]
numa_id = int(split_info[0].replace(keyword1,
'').replace(keyword2, ''))
cpu_idx_tbl[numa_id] = ranges
return cpu_idx_tbl
def bind_cpus(rank_id, ratio=0.5):
# get all visible devices
visible_devices = ASCEND_RT_VISIBLE_DEVICES
if visible_devices is None:
devices = sorted(list(_get_device_map_info().keys()))
else:
devices = [int(x) for x in visible_devices.split(",")]
# Query the NUMA affinity of each NPU via its PCIe address; if this fails,
# fall back to evenly distributing the devices across NUMA nodes.
device_pcie_tbl = _get_pcie_info(devices)
device_numa_tbl, numa_devices_tbl = _get_numa_info(device_pcie_tbl)
if not device_numa_tbl or not numa_devices_tbl:
device_numa_tbl, numa_devices_tbl = _get_numa_info_v2(devices)
# Obtain the complete list of CPU cores for each NUMA node.
cpu_idx_tbl = _get_cpu_info(list(numa_devices_tbl.keys()))
cur_device = devices[rank_id]
numa_id = device_numa_tbl.get(cur_device)
# Within the NUMA node, evenly partition the CPU cores
# among all NPUs (or use the amount specified by CPU_BINDING_NUM)
shard_devices = numa_devices_tbl.get(numa_id)
shard_devices.sort()
all_cpus = cpu_idx_tbl.get(numa_id)
logger.info(
f"rank_id: {rank_id}, device_id: {cur_device}, "
f"numa_id: {numa_id}, shard_devices: {shard_devices}, cpus: {all_cpus}"
)
cpu_nums = len(all_cpus)
if CPU_BINDING_NUM is None:
cpu_num_per_device = int(cpu_nums * ratio // len(shard_devices))
else:
cpu_num_per_device = int(CPU_BINDING_NUM)
if len(shard_devices) * cpu_num_per_device > cpu_nums:
raise RuntimeError(
f"Cpu num in numa {numa_id} to assign {cpu_num_per_device} for every device is not enough, "
f"please decrease the value of CPU_BINDING_NUM!")
if cpu_num_per_device < 0:
raise ValueError("CPU_BINDING_NUM should not be less than 0.")
idx = shard_devices.index(cur_device)
binding_cpus = [
all_cpus[_] for _ in range(idx * cpu_num_per_device, (idx + 1) *
cpu_num_per_device)
]
# cpu bind
p = psutil.Process()
p.cpu_affinity(binding_cpus)
new_affinity = p.cpu_affinity()
logger.info(
f"process {p.pid}, new_affinity is {new_affinity}, cpu count {cpu_num_per_device}"
)
def bind_cpus(rank_id: int) -> None:
binder = CpuAlloc(rank_id)
binder.run_all()

View File

@@ -115,17 +115,6 @@ class NPUWorker(WorkerBase):
distributed_init_method=distributed_init_method,
is_driver_worker=is_driver_worker)
# binding cpu
if get_ascend_config().enable_cpu_binding:
try:
bind_cpus(self.local_rank, ratio=1.0)
except RuntimeError as e:
logger.error(f"{e} in {self.local_rank}")
except ValueError as e:
logger.error(f"{e} in {self.local_rank}")
except Exception:
logger.info("Skip binding cpu.")
if self.cache_config.cache_dtype == "auto":
self.cache_dtype = self.model_config.dtype
else:
@@ -238,6 +227,15 @@ class NPUWorker(WorkerBase):
set_random_seed(self.model_config.seed)
# Initialize device properties used by triton kernels.
init_device_properties_triton()
# binding cpu
if get_ascend_config().enable_cpu_binding:
try:
bind_cpus(self.local_rank)
except Exception as e:
logger.warning(
f"Bind cpus failed in rank{self.local_rank}: {e} Skip binding cpu."
)
return device
def init_device(self):