[Refactor] Modify the binding logic, added memory migration and interrupt core binding functions. (#6785)

[Refactor] Modify the binding logic, added memory migration and
interrupt core binding functions.

### What this PR does / why we need it?
Controls the use of memory on a closer NUMA node to achieve a lower
memory access latency, while binding interrupts to different CPU cores
to prevent them form interrupting the inference process.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?


b8eaaa073b

Signed-off-by: rowzwel_dx <1392851715@qq.com>

Signed-off-by: Rozwel-dx <1392851715@qq.com>
- vLLM version: v0.15.0
- vLLM main:
9562912cea

Signed-off-by: Rozwel-dx <1392851715@qq.com>
This commit is contained in:
Rozwel-dx
2026-02-26 08:49:50 +08:00
committed by GitHub
parent 3a4292e5b7
commit a9cca0c5c4
2 changed files with 86 additions and 5 deletions

View File

@@ -162,11 +162,11 @@ class TestCpuAlloc(unittest.TestCase):
@patch('vllm_ascend.cpu_binding.execute_command') @patch('vllm_ascend.cpu_binding.execute_command')
def test_allocate(self, mock_execute_command): def test_allocate(self, mock_execute_command):
self.cpu_alloc.device_info.running_npu_list = [0] self.cpu_alloc.device_info.running_npu_list = [0]
self.cpu_alloc.npu_cpu_pool = {0: [0, 1, 2]} self.cpu_alloc.npu_cpu_pool = {0: [0, 1, 2, 3, 4]}
self.cpu_alloc.allocate() self.cpu_alloc.allocate()
self.assertEqual(self.cpu_alloc.assign_main[0], [0]) self.assertEqual(self.cpu_alloc.assign_main[0], [2])
self.assertEqual(self.cpu_alloc.assign_acl[0], [1]) self.assertEqual(self.cpu_alloc.assign_acl[0], [3])
self.assertEqual(self.cpu_alloc.assign_rel[0], [2]) self.assertEqual(self.cpu_alloc.assign_rel[0], [4])
self.cpu_alloc.npu_cpu_pool = {0: [0, 1]} self.cpu_alloc.npu_cpu_pool = {0: [0, 1]}
with self.assertRaises(RuntimeError): with self.assertRaises(RuntimeError):
self.cpu_alloc.allocate() self.cpu_alloc.allocate()

View File

@@ -2,6 +2,7 @@
import os import os
import platform import platform
import shutil
import subprocess import subprocess
from collections import defaultdict from collections import defaultdict
@@ -10,6 +11,7 @@ from vllm.logger import logger
from vllm_ascend.utils import AscendDeviceType, get_ascend_device_type from vllm_ascend.utils import AscendDeviceType, get_ascend_device_type
MASK_BIT = 32 # Number of bits in a CPU affinity mask group
ALLOWED_CPUS_PATH = "/proc/self/status" ALLOWED_CPUS_PATH = "/proc/self/status"
ASCEND_RT_VISIBLE_DEVICES = os.getenv("ASCEND_RT_VISIBLE_DEVICES") ASCEND_RT_VISIBLE_DEVICES = os.getenv("ASCEND_RT_VISIBLE_DEVICES")
@@ -127,6 +129,16 @@ class CpuAlloc:
self.assign_acl: dict[int, list[int]] = {} self.assign_acl: dict[int, list[int]] = {}
self.assign_rel: dict[int, list[int]] = {} self.assign_rel: dict[int, list[int]] = {}
@staticmethod
def cpu_to_mask(cpu: int) -> str:
group = cpu // MASK_BIT
bit = cpu % MASK_BIT
value = 1 << bit
mask = f"{value:08x}"
for _ in range(1, group + 1):
mask = f"{mask},{'0' * (MASK_BIT // 4)}"
return mask
@staticmethod @staticmethod
def get_threads_map(thread_message: str) -> dict[str, dict[str, list[str]]]: def get_threads_map(thread_message: str) -> dict[str, dict[str, list[str]]]:
threads_map: dict[str, dict[str, list[str]]] = {} threads_map: dict[str, dict[str, list[str]]] = {}
@@ -270,7 +282,7 @@ class CpuAlloc:
def allocate(self) -> None: def allocate(self) -> None:
for npu, pool in self.npu_cpu_pool.items(): for npu, pool in self.npu_cpu_pool.items():
if len(pool) >= 3: if len(pool) >= 3:
main = pool[:-2] main = pool[2:-2] # Reserve first two CPUs for IRQ binding
acl = [pool[-2]] acl = [pool[-2]]
rel = [pool[-1]] rel = [pool[-1]]
else: else:
@@ -289,6 +301,17 @@ class CpuAlloc:
rel = str(self.assign_rel[current_npu]) if self.assign_rel[current_npu] else "" rel = str(self.assign_rel[current_npu]) if self.assign_rel[current_npu] else ""
logger.info(f"NPU{current_npu}: main=[{main}] acl=[{acl}] release=[{rel}]") logger.info(f"NPU{current_npu}: main=[{main}] acl=[{acl}] release=[{rel}]")
def bind_memory(self, pid: str, npu: int) -> None:
if not shutil.which("migratepages"):
logger.info("The 'migratepages' command is not available, skipping memory binding.")
return
all_numa_nodes = sorted(self.numa_to_cpu_map.keys())
target_cpu = self.assign_acl[npu][0]
target_numa = self.cpu_node[target_cpu]
bind_numa_list = [target_numa, target_numa + 1 if target_numa % 2 == 0 else target_numa - 1]
logger.info(f"[migrate] rank:{self.rank_id} -> NUMA {bind_numa_list}")
execute_command(["migratepages", pid, ",".join(map(str, all_numa_nodes)), ",".join(map(str, bind_numa_list))])
def bind_threads(self) -> None: def bind_threads(self) -> None:
thread_message, _ = execute_command(["ps", "-Te"]) thread_message, _ = execute_command(["ps", "-Te"])
threads_map = self.get_threads_map(thread_message) threads_map = self.get_threads_map(thread_message)
@@ -297,14 +320,72 @@ class CpuAlloc:
self.bind(main_pid, self.assign_main[current_npu], True) self.bind(main_pid, self.assign_main[current_npu], True)
for acl_thread in threads_map.get(main_pid, {}).get("acl_thread", []): for acl_thread in threads_map.get(main_pid, {}).get("acl_thread", []):
self.bind(acl_thread, self.assign_acl[current_npu], False) self.bind(acl_thread, self.assign_acl[current_npu], False)
self.bind_memory(acl_thread, current_npu)
for release_thread in threads_map.get(main_pid, {}).get("release_thread", []): for release_thread in threads_map.get(main_pid, {}).get("release_thread", []):
self.bind(release_thread, self.assign_rel[current_npu], False) self.bind(release_thread, self.assign_rel[current_npu], False)
def bind_npu_irq(self) -> None:
if not os.access("/proc/irq", os.W_OK):
return
if shutil.which("systemctl"):
output, _ = execute_command(["systemctl", "list-unit-files"])
if "irqbalance.service" in output:
_, return_code = execute_command(["systemctl", "is-active", "--quiet", "irqbalance"])
if return_code == 0:
logger.warning(
"The irqbalance service is running and has been stopped. "
"You can run the systemctl start irqbalance command to restart it."
)
execute_command(["systemctl", "stop", "irqbalance"])
sq_irqs = []
with open("/proc/interrupts") as f:
for line in f:
if "sq_send_trigger_irq" in line:
irq = line.split(":")[0].strip()
sq_irqs.append(irq)
for npu in sorted(self.npu_cpu_pool.keys()):
cpus = self.npu_cpu_pool[npu]
if len(cpus) < 2:
continue
sq_cpu, cq_cpu = cpus[0], cpus[1] # Reserved for IRQ binding
info, _ = execute_command(["npu-smi", "info", "-t", "board", "-i", str(npu)])
pci_addr = ""
for line in info.splitlines():
if "PCIe Bus Info" in line:
pci_addr = line.split()[-1].lower()
break
if not pci_addr:
logger.warning(f"Can't find pci address of NPU{npu} .")
continue
try:
npu_irq_list = sorted(os.listdir(f"/sys/bus/pci/devices/{pci_addr}/msi_irqs/"), key=lambda x: int(x))
except FileNotFoundError:
logger.warning(f"The msi_irqs folder cannot be found under /sys/bus/pci/devices/{pci_addr} .")
continue
sq_irq, cq_irq = "", ""
for irq in sq_irqs:
if irq in npu_irq_list:
sq_irq = irq
cq_irq = str(int(irq) + 1)
break
if not sq_irq:
logger.warning(f"The sq_send_trigger_irq of NPU{npu} is not found.")
continue
logger.info(
f"NPU{npu}(PCI {pci_addr}): sq_send_trigger_irq IRQ_ID={sq_irq} -> CPU{sq_cpu}, "
f"cq_update_irq IRQ_ID={cq_irq} -> CPU{cq_cpu}"
)
with open(f"/proc/irq/{sq_irq}/smp_affinity", "w") as f:
f.write(self.cpu_to_mask(sq_cpu))
with open(f"/proc/irq/{cq_irq}/smp_affinity", "w") as f:
f.write(self.cpu_to_mask(cq_cpu))
def run_all(self) -> None: def run_all(self) -> None:
self.build_cpu_pools() self.build_cpu_pools()
self.allocate() self.allocate()
self.print_plan() self.print_plan()
self.bind_threads() self.bind_threads()
self.bind_npu_irq()
def bind_cpus(rank_id: int) -> None: def bind_cpus(rank_id: int) -> None: