[CPU binding] Implement global CPU slicing and improve IRQ binding for Ascend NPUs (#6945)

### What this PR does / why we need it?

This PR introduces global CPU slicing for Ascend NPUs to ensure
non-overlapping CPU partitions, addresses IRQ binding logical errors on
A3, and enhances the logic for determining total NPUs in CPU allocation.
These changes are necessary to optimize CPU resource management and
improve system stability.

- **Global CPU Slicing**: Introduced a global CPU slicing mechanism for
Ascend NPUs to ensure non-overlapping CPU partitions across multiple
processes or data parallel groups, preventing resource contention.
- **Improved IRQ Binding for A3 Devices**: Refined the IRQ binding logic
specifically for Ascend A3 devices, correctly mapping logical NPU IDs to
physical card and chip IDs for accurate npu-smi queries and preventing
multi-process overwrite of IRQ settings.
- **Enhanced NPU Count Determination**: Improved the logic for
determining the total number of logical NPUs, prioritizing NPU mapping
information to ensure more accurate CPU allocation.
- **Minimum CPU Requirement**: Established a minimum requirement of 5
CPUs per NPU for binding, reserving specific cores for IRQ, main, ACL,
and release operations to ensure stable operation.

### Does this PR introduce _any_ user-facing change?

No user-facing changes are introduced.

### How was this patch tested?

CI passed with new added/existing tests.

- vLLM version: v0.16.0
- vLLM main:
15d76f74e2

---------

Signed-off-by: c00818886 <chenchuwei@huawei.com>
This commit is contained in:
Frank Chen
2026-03-03 17:20:52 +08:00
committed by GitHub
parent 700423156f
commit b771ca9a47
2 changed files with 249 additions and 124 deletions

View File

@@ -1,5 +1,5 @@
import unittest
from unittest.mock import patch
from unittest.mock import mock_open, patch
from vllm_ascend.cpu_binding import CpuAlloc, DeviceInfo, bind_cpus, is_arm_cpu
from vllm_ascend.utils import AscendDeviceType
@@ -70,6 +70,10 @@ class TestDeviceInfo(unittest.TestCase):
result = self.device_info.expand_cpu_list("0-2, 4, 6-8")
self.assertEqual(result, [0, 1, 2, 4, 6, 7, 8])
def test_get_all_logic_npus(self):
self.assertEqual(self.device_info.all_logic_npus, [0, 1])
self.assertEqual(self.device_info.total_logic_npus, 2)
class TestCpuAlloc(unittest.TestCase):
@@ -107,19 +111,28 @@ class TestCpuAlloc(unittest.TestCase):
@patch('vllm_ascend.cpu_binding.get_ascend_device_type')
def test_binding_mode_table(self, mock_get_device_type):
mock_get_device_type.return_value = AscendDeviceType.A2
self.assertEqual(self.cpu_alloc._binding_mode(), "affinity")
self.assertEqual(self.cpu_alloc._binding_mode(), "topo_affinity")
mock_get_device_type.return_value = AscendDeviceType.A3
self.assertEqual(self.cpu_alloc._binding_mode(), "numa_balanced")
self.assertEqual(self.cpu_alloc._binding_mode(), "global_slice")
@patch('vllm_ascend.cpu_binding.get_ascend_device_type')
def test_build_cpu_pools_fallback_to_numa_balanced(self, mock_get_device_type):
def test_build_cpu_pools_fallback_to_global_slice(self, mock_get_device_type):
mock_get_device_type.return_value = AscendDeviceType.A2
self.cpu_alloc.device_info.npu_affinity = {}
with patch.object(self.cpu_alloc, "build_cpu_node_map") as mock_build_cpu_node_map, \
patch.object(self.cpu_alloc, "handle_no_affinity") as mock_handle_no_affinity:
patch.object(self.cpu_alloc, "build_global_slice_cpu_pool") as mock_build_global_slice_cpu_pool:
self.cpu_alloc.build_cpu_pools()
mock_build_cpu_node_map.assert_called_once()
mock_handle_no_affinity.assert_called_once()
mock_build_global_slice_cpu_pool.assert_called_once()
@patch('vllm_ascend.cpu_binding.get_ascend_device_type')
def test_build_cpu_pools_global_slice_mode(self, mock_get_device_type):
mock_get_device_type.return_value = AscendDeviceType.A3
with patch.object(self.cpu_alloc, "build_cpu_node_map") as mock_build_cpu_node_map, \
patch.object(self.cpu_alloc, "build_global_slice_cpu_pool") as mock_build_global_slice_cpu_pool:
self.cpu_alloc.build_cpu_pools()
mock_build_cpu_node_map.assert_called_once()
mock_build_global_slice_cpu_pool.assert_called_once()
def test_extend_numa(self):
result = self.cpu_alloc.extend_numa([])
@@ -146,18 +159,50 @@ class TestCpuAlloc(unittest.TestCase):
self.assertEqual(self.cpu_alloc.numa_to_cpu_map,
expected_numa_to_cpu_map)
@patch('vllm_ascend.cpu_binding.get_ascend_device_type')
@patch('vllm_ascend.cpu_binding.execute_command')
def test_handle_no_affinity(self, mock_execute_command, mock_get_device_type):
mock_get_device_type.return_value = AscendDeviceType.A3
mock_execute_command.side_effect = [("0 0\n1 1", 0), ("0 0\n1 1", 0)]
def test_build_global_slice_cpu_pool_uses_total_logic_npus(self):
self.cpu_alloc.device_info.running_npu_list = [1]
self.cpu_alloc.device_info.allowed_cpus = list(range(20))
self.cpu_alloc.device_info.total_logic_npus = 2
self.cpu_alloc.device_info.npu_affinity = {0: [0, 1], 1: [2, 3]}
self.cpu_alloc.build_global_slice_cpu_pool()
self.assertEqual(self.cpu_alloc.npu_cpu_pool[1], list(range(10, 20)))
def test_build_global_slice_cpu_pool_fallback_to_affinity_len(self):
self.cpu_alloc.device_info.running_npu_list = [0, 1]
self.cpu_alloc.device_info.allowed_cpus = [0, 1, 2, 3]
self.cpu_alloc.device_info.affinity = {}
self.assertEqual(self.cpu_alloc.npu_cpu_pool, {})
self.cpu_alloc.device_info.affinity = {0: [0, 1], 1: [2, 3]}
self.cpu_alloc.build_cpu_pools()
self.assertEqual(len(self.cpu_alloc.npu_cpu_pool), 2)
self.cpu_alloc.device_info.allowed_cpus = list(range(12))
self.cpu_alloc.device_info.total_logic_npus = 0
self.cpu_alloc.device_info.npu_affinity = {0: [0, 1], 1: [2, 3]}
self.cpu_alloc.build_global_slice_cpu_pool()
self.assertEqual(self.cpu_alloc.npu_cpu_pool[0], [0, 1, 2, 3, 4, 5])
self.assertEqual(self.cpu_alloc.npu_cpu_pool[1], [6, 7, 8, 9, 10, 11])
def test_build_global_slice_cpu_pool_fallback_to_running_len(self):
self.cpu_alloc.device_info.running_npu_list = [0, 1]
self.cpu_alloc.device_info.allowed_cpus = list(range(12))
self.cpu_alloc.device_info.total_logic_npus = 0
self.cpu_alloc.device_info.npu_affinity = {}
self.cpu_alloc.build_global_slice_cpu_pool()
self.assertEqual(self.cpu_alloc.npu_cpu_pool[0], [0, 1, 2, 3, 4, 5])
self.assertEqual(self.cpu_alloc.npu_cpu_pool[1], [6, 7, 8, 9, 10, 11])
def test_build_global_slice_cpu_pool_raises_when_cpu_insufficient(self):
self.cpu_alloc.device_info.running_npu_list = [0, 1]
self.cpu_alloc.device_info.allowed_cpus = list(range(8))
self.cpu_alloc.device_info.total_logic_npus = 2
with self.assertRaises(RuntimeError):
self.cpu_alloc.build_global_slice_cpu_pool()
def test_build_global_slice_cpu_pool_raises_invalid_npu_id(self):
self.cpu_alloc.device_info.running_npu_list = [2]
self.cpu_alloc.device_info.allowed_cpus = list(range(12))
self.cpu_alloc.device_info.total_logic_npus = 2
with self.assertRaises(RuntimeError):
self.cpu_alloc.build_global_slice_cpu_pool()
@patch('vllm_ascend.cpu_binding.execute_command')
def test_allocate(self, mock_execute_command):
@@ -182,6 +227,28 @@ class TestCpuAlloc(unittest.TestCase):
self.cpu_alloc.bind_threads()
mock_execute_command.assert_called()
@patch('vllm_ascend.cpu_binding.get_ascend_device_type')
@patch('vllm_ascend.cpu_binding.os.listdir')
@patch('builtins.open', new_callable=mock_open, read_data='123: 0 0 0 0 sq_send_trigger_irq\n')
@patch('vllm_ascend.cpu_binding.shutil.which')
@patch('vllm_ascend.cpu_binding.os.access')
@patch('vllm_ascend.cpu_binding.execute_command')
def test_bind_npu_irq_a3_uses_card_chip_mapping(self, mock_execute_command, mock_access,
mock_which, _mock_open, mock_listdir,
mock_get_device_type):
mock_access.return_value = True
mock_which.return_value = None
mock_listdir.side_effect = FileNotFoundError
mock_get_device_type.return_value = AscendDeviceType.A3
mock_execute_command.return_value = ("PCIe Bus Info 0000:03:00.0", 0)
self.cpu_alloc.rank_id = 0
self.cpu_alloc.device_info.running_npu_list = [3]
self.cpu_alloc.npu_cpu_pool = {3: [0, 1, 2, 3, 4]}
self.cpu_alloc.bind_npu_irq()
mock_execute_command.assert_any_call(["npu-smi", "info", "-t", "board", "-i", "1", "-c", "1"])
class TestBindingSwitch(unittest.TestCase):

View File

@@ -12,9 +12,19 @@ from vllm.logger import logger
from vllm_ascend.utils import AscendDeviceType, get_ascend_device_type
MASK_BIT = 32 # Number of bits in a CPU affinity mask group
MIN_CPUS_PER_NPU = 5 # 2(IRQ) + 1(main, at least 1 CPU) + 1(acl) + 1(release) = 5 CPUs per NPU
ALLOWED_CPUS_PATH = "/proc/self/status"
ASCEND_RT_VISIBLE_DEVICES = os.getenv("ASCEND_RT_VISIBLE_DEVICES")
TOPO_AFFINITY_MODE = "topo_affinity"
GLOBAL_SLICE_MODE = "global_slice"
DEVICE_BINDING_MODE: dict["AscendDeviceType", str] = {
AscendDeviceType.A2: TOPO_AFFINITY_MODE,
AscendDeviceType.A3: GLOBAL_SLICE_MODE,
AscendDeviceType._310P: TOPO_AFFINITY_MODE,
}
def is_arm_cpu() -> bool:
arch = platform.machine().lower()
@@ -38,6 +48,8 @@ class DeviceInfo:
self.allowed_cpus: list[int] = self.parse_allowed_cpus()
self.running_npu_list: list[int] = self.get_running_npus()
self.npu_affinity: dict[int, list[int]] = self.parse_topo_affinity()
self.all_logic_npus: list[int] = self.get_all_logic_npus()
self.total_logic_npus: int = len(self.all_logic_npus)
@staticmethod
def expand_cpu_list(allowed_list_str: str) -> list[int]:
@@ -50,6 +62,20 @@ class DeviceInfo:
allowed_cpus_list.append(int(per_range))
return allowed_cpus_list
def get_all_logic_npus(self) -> list[int]:
"""Collect all logical NPU IDs from the NPU mapping.
self.npu_map_info maps a board_id (A3) or npu_id (A2) to a per-chip map.
The per-chip map uses chip_id as the key and the logical NPU ID string
as the value.
"""
logic_ids: set[int] = set()
for _, chip_map in self.npu_map_info.items():
for _, logic_str in chip_map.items():
if logic_str and logic_str.isdigit():
logic_ids.add(int(logic_str))
return sorted(logic_ids)
@staticmethod
def get_npu_map_info() -> dict[str, dict[str, str]]:
npu_map_info: dict[str, dict[str, str]] = {}
@@ -208,95 +234,100 @@ class CpuAlloc:
if len(self.numa_to_cpu_map) == 0:
raise RuntimeError("lscpu command output error, no NUMA node available. Please check!")
def handle_no_affinity(self) -> None:
def build_global_slice_cpu_pool(self) -> None:
"""
1) Build available NUMA nodes after allowed_cpus filtering
2) Assign NPUs to NUMA nodes by round-robin (npu_id % num_nodes)
3) Within each NUMA node, split its CPU list into per-NPU disjoint slices
Build per-NPU CPU pools by slicing allowed_cpus using GLOBAL logical NPU ids.
Why:
- Multiple processes/DP groups may share the SAME cpuset (same allowed_cpus).
- If each process slices only its visible NPUs, CPU ranges overlap across processes.
- Global slicing ensures deterministic, non-overlapping CPU partitions per logical NPU id.
Notes:
- This strategy does NOT rely on npu-smi topo affinity.
- NUMA locality is achieved only if CPU numbering aligns with NUMA layout.
- Requires per-NPU slice size >= 5 (IRQ(2) + main(>=1) + acl(1) + release(1)).
"""
running = list(self.device_info.running_npu_list)
if not running or not self.numa_to_cpu_map:
if not running:
return
# 1) Only keep NUMA nodes that still have CPUs after allowed_cpus filtering.
available_nodes: list[tuple[int, list[int]]] = []
for node in sorted(self.numa_to_cpu_map):
cpus = [c for c in self.numa_to_cpu_map[node] if c in self.device_info.allowed_cpus]
if cpus:
available_nodes.append((node, cpus))
if not available_nodes:
allowed = sorted(set(self.device_info.allowed_cpus))
total_cpu = len(allowed)
if total_cpu == 0:
return
num_nodes = len(available_nodes)
# Infer "my_npu" from local rank + visible running_npu_list, assuming local rank is index into running_npu_list.
if 0 <= self.rank_id < len(running):
my_npu = running[self.rank_id]
# Prefer mapping info (npu-smi info -m), fallback to topo keys, then visible list
if self.device_info.total_logic_npus > 0:
total_npus = self.device_info.total_logic_npus
elif self.device_info.npu_affinity:
total_npus = len(self.device_info.npu_affinity)
else:
# Fallback: modulo in case rank range is larger than visible list length.
my_npu = running[self.rank_id % len(running)]
total_npus = len(running)
print(
f"[no_affinity_fine] rank:{self.rank_id} -> my_npu:{my_npu}; "
f"running_npu_list:{running}; num_available_nodes:{num_nodes}"
if total_npus <= 0:
return
# Compute global per-NPU slicing
base = total_cpu // total_npus
extra = total_cpu % total_npus
logger.debug(
f"[cpu_global_slice] rank:{self.rank_id} ASCEND_RT_VISIBLE_DEVICES={ASCEND_RT_VISIBLE_DEVICES} "
f"running_npu_list:{running} total_npus:{total_npus} allowed_cpus:{total_cpu} "
f"base:{base} extra:{extra} allowed_cpus_head:{allowed[:16]} allowed_cpus_tail:{allowed[-16:]}"
)
# 2) Round-robin assign NPUs to nodes based on NPU id (same as new logic).
# Build: node_index -> list[npu]
node_to_npus: dict[int, list[int]] = {i: [] for i in range(num_nodes)}
# Enforce per-NPU slice length >= 5.
# Because with remainder distribution, some NPUs may get 'base' cores and some get 'base+1'.
# The minimum slice size is 'base'.
if base < MIN_CPUS_PER_NPU:
raise RuntimeError(
"Insufficient CPUs for binding with IRQ/ACL/REL reservations: "
f"total_allowed={total_cpu}, total_npus={total_npus}, "
f"min_per_npu={base} (<{MIN_CPUS_PER_NPU}). "
f"Need at least {total_npus * MIN_CPUS_PER_NPU} CPUs in cpuset."
)
def _slice_for_npu(global_npu_id: int) -> list[int]:
# start = global_npu_id*base + min(global_npu_id, extra)
start = global_npu_id * base + (global_npu_id if global_npu_id < extra else extra)
take = base + (1 if global_npu_id < extra else 0)
end = start + take
return allowed[start:end]
for npu in running:
node_index = npu % num_nodes
node_to_npus[node_index].append(npu)
if npu < 0 or npu >= total_npus:
raise RuntimeError(f"Invalid NPU id {npu}, total_npus={total_npus}.")
cpus = _slice_for_npu(npu)
# Extra safety: should always be >= base >= 5
if len(cpus) < MIN_CPUS_PER_NPU:
raise RuntimeError(
f"NPU{npu} got too few CPUs: {len(cpus)} (<5). "
f"total_allowed={total_cpu}, total_npus={total_npus}, base={base}, extra={extra}"
)
self.npu_cpu_pool[npu] = cpus
# 3) Within each node, split cpus among the NPUs assigned to this node.
for node_index, npus in node_to_npus.items():
if not npus:
continue
node_id, cpus = available_nodes[node_index]
total_cpu_num = len(cpus)
n = len(npus)
# Edge case: should not happen because we filtered cpus, but keep safe.
if total_cpu_num == 0:
continue
# If CPUs are fewer than NPUs, we can only guarantee small (possibly duplicated) slices.
if total_cpu_num < n:
for i, npu in enumerate(npus):
cpu = cpus[i % total_cpu_num]
self.npu_cpu_pool[npu] = [cpu]
continue
# Even split (disjoint slices), first 'extra' NPUs take 1 more CPU.
base = total_cpu_num // n
extra = total_cpu_num % n
start = 0
for i, npu in enumerate(npus):
take = base + (1 if i < extra else 0)
end = start + take
self.npu_cpu_pool[npu] = cpus[start:end]
start = end
DEVICE_BINDING_MODE = {
AscendDeviceType.A3: "numa_balanced",
}
@classmethod
def _binding_mode(cls) -> str:
@staticmethod
def _binding_mode() -> str:
device_type = get_ascend_device_type()
return cls.DEVICE_BINDING_MODE.get(device_type, "affinity")
return DEVICE_BINDING_MODE.get(device_type, TOPO_AFFINITY_MODE)
def build_cpu_pools(self) -> None:
self.build_cpu_node_map()
if self._binding_mode() == "numa_balanced":
self.handle_no_affinity()
mode = self._binding_mode()
logger.info(f"[cpu_bind_mode] mode={mode} rank={self.rank_id} visible_npus={self.device_info.running_npu_list}")
if mode == GLOBAL_SLICE_MODE:
self.build_global_slice_cpu_pool()
return
# topo_affinity mode
if not self.device_info.npu_affinity:
logger.warning("NPU affinity info not found, fallback to NUMA-balanced CPU binding.")
self.handle_no_affinity()
logger.warning("NPU topo affinity not found, fallback to global-slice CPU binding.")
self.build_global_slice_cpu_pool()
return
for npu in self.device_info.running_npu_list:
base_cpu_list = [
cpu for cpu in self.device_info.npu_affinity.get(npu, []) if cpu in self.device_info.allowed_cpus
@@ -305,9 +336,11 @@ class CpuAlloc:
raise RuntimeError("CPUs available in 'Cpus_allowed_list' conflict with NUMA affinity.")
extra_cpu_list = self.extend_numa(base_cpu_list)
self.npu_cpu_pool[npu] = extra_cpu_list
groups = defaultdict(list)
for npu, cpus in self.npu_cpu_pool.items():
groups[str(cpus)].append(npu)
final: dict[int, list[int]] = {}
for key, npu_list in groups.items():
if len(npu_list) == 1:
@@ -318,13 +351,13 @@ class CpuAlloc:
def allocate(self) -> None:
for npu, pool in self.npu_cpu_pool.items():
if len(pool) >= 3:
main = pool[2:-2] # Reserve first two CPUs for IRQ binding
if len(pool) >= MIN_CPUS_PER_NPU:
main = pool[2:-2]
acl = [pool[-2]]
rel = [pool[-1]]
else:
raise RuntimeError(
"The number of CPUs is insufficient to bind to the NPUs. Each NPU requires at least 3 CPUs."
f"The number of CPUs is insufficient. Each NPU requires at least {MIN_CPUS_PER_NPU} CPUs."
)
self.assign_main[npu] = main
self.assign_acl[npu] = acl
@@ -383,6 +416,13 @@ class CpuAlloc:
def bind_npu_irq(self) -> None:
if not os.access("/proc/irq", os.W_OK):
return
# Only bind IRQ for current rank's NPU to avoid multi-process overwrite.
current_npu = self.device_info.running_npu_list[self.rank_id]
if current_npu not in self.npu_cpu_pool:
logger.warning(f"[irq] rank:{self.rank_id} -> NPU{current_npu} has no cpu pool, skip irq binding.")
return
if shutil.which("systemctl"):
output, _ = execute_command(["systemctl", "list-unit-files"])
if "irqbalance.service" in output:
@@ -393,48 +433,66 @@ class CpuAlloc:
"You can run the systemctl start irqbalance command to restart it."
)
execute_command(["systemctl", "stop", "irqbalance"])
sq_irqs = []
with open("/proc/interrupts") as f:
for line in f:
if "sq_send_trigger_irq" in line:
irq = line.split(":")[0].strip()
sq_irqs.append(irq)
for npu in sorted(self.npu_cpu_pool.keys()):
cpus = self.npu_cpu_pool[npu]
if len(cpus) < 2:
continue
sq_cpu, cq_cpu = cpus[0], cpus[1] # Reserved for IRQ binding
npu = current_npu
cpus = self.npu_cpu_pool[npu]
if len(cpus) < 2:
logger.warning(f"[irq] NPU{npu} cpu pool too small (<2), skip irq binding.")
return
sq_cpu, cq_cpu = cpus[0], cpus[1] # Reserved for IRQ binding
pci_addr = ""
device_type = get_ascend_device_type()
if device_type == AscendDeviceType.A3:
# A3: logical npu_id = card_id*2 + chip_id
card_id = npu // 2
chip_id = npu % 2
info, _ = execute_command(["npu-smi", "info", "-t", "board", "-i", str(card_id), "-c", str(chip_id)])
else:
# A2 / others: logical npu_id is card id
info, _ = execute_command(["npu-smi", "info", "-t", "board", "-i", str(npu)])
pci_addr = ""
for line in info.splitlines():
if "PCIe Bus Info" in line:
pci_addr = line.split()[-1].lower()
break
if not pci_addr:
logger.warning(f"Can't find pci address of NPU{npu} .")
continue
try:
npu_irq_list = sorted(os.listdir(f"/sys/bus/pci/devices/{pci_addr}/msi_irqs/"), key=lambda x: int(x))
except FileNotFoundError:
logger.warning(f"The msi_irqs folder cannot be found under /sys/bus/pci/devices/{pci_addr} .")
continue
sq_irq, cq_irq = "", ""
for irq in sq_irqs:
if irq in npu_irq_list:
sq_irq = irq
cq_irq = str(int(irq) + 1)
break
if not sq_irq:
logger.warning(f"The sq_send_trigger_irq of NPU{npu} is not found.")
continue
logger.info(
f"NPU{npu}(PCI {pci_addr}): sq_send_trigger_irq IRQ_ID={sq_irq} -> CPU{sq_cpu}, "
f"cq_update_irq IRQ_ID={cq_irq} -> CPU{cq_cpu}"
)
with open(f"/proc/irq/{sq_irq}/smp_affinity", "w") as f:
f.write(self.cpu_to_mask(sq_cpu))
with open(f"/proc/irq/{cq_irq}/smp_affinity", "w") as f:
f.write(self.cpu_to_mask(cq_cpu))
for line in info.splitlines():
if "PCIe Bus Info" in line:
pci_addr = line.split()[-1].lower()
break
if not pci_addr:
logger.warning(f"Can't find pci address of NPU{npu} .")
return
try:
npu_irq_list = sorted(os.listdir(f"/sys/bus/pci/devices/{pci_addr}/msi_irqs/"), key=lambda x: int(x))
except FileNotFoundError:
logger.warning(f"The msi_irqs folder cannot be found under /sys/bus/pci/devices/{pci_addr} .")
return
sq_irq, cq_irq = "", ""
for irq in sq_irqs:
if irq in npu_irq_list:
sq_irq = irq
cq_irq = str(int(irq) + 1)
break
if not sq_irq:
logger.warning(f"The sq_send_trigger_irq of NPU{npu} is not found.")
return
logger.info(
f"NPU{npu}(PCI {pci_addr}): sq_send_trigger_irq IRQ_ID={sq_irq} -> CPU{sq_cpu}, "
f"cq_update_irq IRQ_ID={cq_irq} -> CPU{cq_cpu}"
)
with open(f"/proc/irq/{sq_irq}/smp_affinity", "w") as f:
f.write(self.cpu_to_mask(sq_cpu))
with open(f"/proc/irq/{cq_irq}/smp_affinity", "w") as f:
f.write(self.cpu_to_mask(cq_cpu))
def run_all(self) -> None:
self.build_cpu_pools()