From a9cca0c5c4cf7d8d1ff5e51423ace93253cba0d7 Mon Sep 17 00:00:00 2001 From: Rozwel-dx <52559239+Rozwel-dx@users.noreply.github.com> Date: Thu, 26 Feb 2026 08:49:50 +0800 Subject: [PATCH] [Refactor] Modify the binding logic, added memory migration and interrupt core binding functions. (#6785) [Refactor] Modify the binding logic, added memory migration and interrupt core binding functions. ### What this PR does / why we need it? Controls the use of memory on a closer NUMA node to achieve a lower memory access latency, while binding interrupts to different CPU cores to prevent them form interrupting the inference process. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? https://github.com/vllm-project/vllm-ascend/pull/6785/changes/b8eaaa073bc99e3a25e31c16e87bbd4acd6377eb Signed-off-by: rowzwel_dx <1392851715@qq.com> Signed-off-by: Rozwel-dx <1392851715@qq.com> - vLLM version: v0.15.0 - vLLM main: https://github.com/vllm-project/vllm/commit/9562912cead1f11e8540fb91306c5cbda66f0007 Signed-off-by: Rozwel-dx <1392851715@qq.com> --- tests/ut/device_allocator/test_cpu_binding.py | 8 +- vllm_ascend/cpu_binding.py | 83 ++++++++++++++++++- 2 files changed, 86 insertions(+), 5 deletions(-) diff --git a/tests/ut/device_allocator/test_cpu_binding.py b/tests/ut/device_allocator/test_cpu_binding.py index 2fd16f0f..89657f30 100644 --- a/tests/ut/device_allocator/test_cpu_binding.py +++ b/tests/ut/device_allocator/test_cpu_binding.py @@ -162,11 +162,11 @@ class TestCpuAlloc(unittest.TestCase): @patch('vllm_ascend.cpu_binding.execute_command') def test_allocate(self, mock_execute_command): self.cpu_alloc.device_info.running_npu_list = [0] - self.cpu_alloc.npu_cpu_pool = {0: [0, 1, 2]} + self.cpu_alloc.npu_cpu_pool = {0: [0, 1, 2, 3, 4]} self.cpu_alloc.allocate() - self.assertEqual(self.cpu_alloc.assign_main[0], [0]) - self.assertEqual(self.cpu_alloc.assign_acl[0], [1]) - self.assertEqual(self.cpu_alloc.assign_rel[0], [2]) + self.assertEqual(self.cpu_alloc.assign_main[0], [2]) + self.assertEqual(self.cpu_alloc.assign_acl[0], [3]) + self.assertEqual(self.cpu_alloc.assign_rel[0], [4]) self.cpu_alloc.npu_cpu_pool = {0: [0, 1]} with self.assertRaises(RuntimeError): self.cpu_alloc.allocate() diff --git a/vllm_ascend/cpu_binding.py b/vllm_ascend/cpu_binding.py index fd8717de..c09e740a 100644 --- a/vllm_ascend/cpu_binding.py +++ b/vllm_ascend/cpu_binding.py @@ -2,6 +2,7 @@ import os import platform +import shutil import subprocess from collections import defaultdict @@ -10,6 +11,7 @@ from vllm.logger import logger from vllm_ascend.utils import AscendDeviceType, get_ascend_device_type +MASK_BIT = 32 # Number of bits in a CPU affinity mask group ALLOWED_CPUS_PATH = "/proc/self/status" ASCEND_RT_VISIBLE_DEVICES = os.getenv("ASCEND_RT_VISIBLE_DEVICES") @@ -127,6 +129,16 @@ class CpuAlloc: self.assign_acl: dict[int, list[int]] = {} self.assign_rel: dict[int, list[int]] = {} + @staticmethod + def cpu_to_mask(cpu: int) -> str: + group = cpu // MASK_BIT + bit = cpu % MASK_BIT + value = 1 << bit + mask = f"{value:08x}" + for _ in range(1, group + 1): + mask = f"{mask},{'0' * (MASK_BIT // 4)}" + return mask + @staticmethod def get_threads_map(thread_message: str) -> dict[str, dict[str, list[str]]]: threads_map: dict[str, dict[str, list[str]]] = {} @@ -270,7 +282,7 @@ class CpuAlloc: def allocate(self) -> None: for npu, pool in self.npu_cpu_pool.items(): if len(pool) >= 3: - main = pool[:-2] + main = pool[2:-2] # Reserve first two CPUs for IRQ binding acl = [pool[-2]] rel = [pool[-1]] else: @@ -289,6 +301,17 @@ class CpuAlloc: rel = str(self.assign_rel[current_npu]) if self.assign_rel[current_npu] else "" logger.info(f"NPU{current_npu}: main=[{main}] acl=[{acl}] release=[{rel}]") + def bind_memory(self, pid: str, npu: int) -> None: + if not shutil.which("migratepages"): + logger.info("The 'migratepages' command is not available, skipping memory binding.") + return + all_numa_nodes = sorted(self.numa_to_cpu_map.keys()) + target_cpu = self.assign_acl[npu][0] + target_numa = self.cpu_node[target_cpu] + bind_numa_list = [target_numa, target_numa + 1 if target_numa % 2 == 0 else target_numa - 1] + logger.info(f"[migrate] rank:{self.rank_id} -> NUMA {bind_numa_list}") + execute_command(["migratepages", pid, ",".join(map(str, all_numa_nodes)), ",".join(map(str, bind_numa_list))]) + def bind_threads(self) -> None: thread_message, _ = execute_command(["ps", "-Te"]) threads_map = self.get_threads_map(thread_message) @@ -297,14 +320,72 @@ class CpuAlloc: self.bind(main_pid, self.assign_main[current_npu], True) for acl_thread in threads_map.get(main_pid, {}).get("acl_thread", []): self.bind(acl_thread, self.assign_acl[current_npu], False) + self.bind_memory(acl_thread, current_npu) for release_thread in threads_map.get(main_pid, {}).get("release_thread", []): self.bind(release_thread, self.assign_rel[current_npu], False) + def bind_npu_irq(self) -> None: + if not os.access("/proc/irq", os.W_OK): + return + if shutil.which("systemctl"): + output, _ = execute_command(["systemctl", "list-unit-files"]) + if "irqbalance.service" in output: + _, return_code = execute_command(["systemctl", "is-active", "--quiet", "irqbalance"]) + if return_code == 0: + logger.warning( + "The irqbalance service is running and has been stopped. " + "You can run the systemctl start irqbalance command to restart it." + ) + execute_command(["systemctl", "stop", "irqbalance"]) + sq_irqs = [] + with open("/proc/interrupts") as f: + for line in f: + if "sq_send_trigger_irq" in line: + irq = line.split(":")[0].strip() + sq_irqs.append(irq) + for npu in sorted(self.npu_cpu_pool.keys()): + cpus = self.npu_cpu_pool[npu] + if len(cpus) < 2: + continue + sq_cpu, cq_cpu = cpus[0], cpus[1] # Reserved for IRQ binding + info, _ = execute_command(["npu-smi", "info", "-t", "board", "-i", str(npu)]) + pci_addr = "" + for line in info.splitlines(): + if "PCIe Bus Info" in line: + pci_addr = line.split()[-1].lower() + break + if not pci_addr: + logger.warning(f"Can't find pci address of NPU{npu} .") + continue + try: + npu_irq_list = sorted(os.listdir(f"/sys/bus/pci/devices/{pci_addr}/msi_irqs/"), key=lambda x: int(x)) + except FileNotFoundError: + logger.warning(f"The msi_irqs folder cannot be found under /sys/bus/pci/devices/{pci_addr} .") + continue + sq_irq, cq_irq = "", "" + for irq in sq_irqs: + if irq in npu_irq_list: + sq_irq = irq + cq_irq = str(int(irq) + 1) + break + if not sq_irq: + logger.warning(f"The sq_send_trigger_irq of NPU{npu} is not found.") + continue + logger.info( + f"NPU{npu}(PCI {pci_addr}): sq_send_trigger_irq IRQ_ID={sq_irq} -> CPU{sq_cpu}, " + f"cq_update_irq IRQ_ID={cq_irq} -> CPU{cq_cpu}" + ) + with open(f"/proc/irq/{sq_irq}/smp_affinity", "w") as f: + f.write(self.cpu_to_mask(sq_cpu)) + with open(f"/proc/irq/{cq_irq}/smp_affinity", "w") as f: + f.write(self.cpu_to_mask(cq_cpu)) + def run_all(self) -> None: self.build_cpu_pools() self.allocate() self.print_plan() self.bind_threads() + self.bind_npu_irq() def bind_cpus(rank_id: int) -> None: