diff --git a/tests/ut/device_allocator/test_cpu_binding.py b/tests/ut/device_allocator/test_cpu_binding.py new file mode 100644 index 00000000..4f5cb114 --- /dev/null +++ b/tests/ut/device_allocator/test_cpu_binding.py @@ -0,0 +1,167 @@ +import unittest +from unittest.mock import patch + +from vllm_ascend.cpu_binding import CpuAlloc, DeviceInfo + + +class TestDeviceInfo(unittest.TestCase): + + @patch('vllm_ascend.cpu_binding.execute_command') + def setUp(self, mock_execute_command): + mock_execute_command.side_effect = [ + ("NPU ID Chip ID Chip Logic ID Chip Name\n0 0 0 Ascend\n0 1 - Mcu\n1 0 1 Ascend", + 0), + ("| NPU Chip | Process id |\n| 0 0 | 1234 | vllm | 56000 |\n| 1 0 | 1235 | vllm | 56000 |", + 0), ("", 0) + ] + self.device_info = DeviceInfo() + + @patch('vllm_ascend.cpu_binding.execute_command') + def test_get_npu_map_info(self, mock_execute_command): + execute_result_list = [ + ("NPU ID Chip ID Chip Logic ID Chip Phy-ID Chip Name\n0 0 0 0 Ascend\n0 1 1 1 Ascend\n0 2 - - Mcu", + 0), + ("NPU ID Chip ID Chip Logic ID Chip Name\n8 0 0 Ascend\n8 1 - Mcu\n9 0 1 Ascend", + 0), + ] + result_list = [{ + '0': { + '0': '0', + '1': '1' + } + }, { + '8': { + '0': '0' + }, + '9': { + '0': '1' + } + }] + for result in execute_result_list: + mock_execute_command.return_value = result + npu_map_info = self.device_info.get_npu_map_info() + expected = result_list.pop(0) + self.assertEqual(npu_map_info, expected) + + @patch('vllm_ascend.cpu_binding.execute_command') + def test_get_running_npus(self, mock_execute_command): + mock_execute_command.side_effect = [ + ("| NPU Chip | Process id |\n| 0 1 | 1236 | vllm | 56000 |", 0), + ("", 0), + ("| NPU Chip | Process id |\n| 1 0 | 1236 | vllm | 56000 |", 0) + ] + with self.assertRaises(RuntimeError): + self.device_info.get_running_npus() + with self.assertRaises(RuntimeError): + self.device_info.get_running_npus() + running_npus = self.device_info.get_running_npus() + self.assertEqual(len(running_npus), 1) + + @patch('vllm_ascend.cpu_binding.execute_command') + def test_parse_topo_affinity(self, mock_execute_command): + mock_execute_command.return_value = ( + "NPU0 X HCCS HCCS HCCS HCCS HCCS HCCS HCCS 0-3", 0) + affinity = self.device_info.parse_topo_affinity() + expected = {0: [0, 1, 2, 3]} + self.assertEqual(affinity, expected) + + def test_expand_cpu_list(self): + result = self.device_info.expand_cpu_list("0-2, 4, 6-8") + self.assertEqual(result, [0, 1, 2, 4, 6, 7, 8]) + + +class TestCpuAlloc(unittest.TestCase): + + @patch('vllm_ascend.cpu_binding.execute_command') + def setUp(self, mock_execute_command): + mock_execute_command.side_effect = [ + ("NPU ID Chip ID Chip Logic ID Chip Name\n0 0 0 Ascend\n0 1 - Mcu\n1 0 1 Ascend", + 0), + ("| NPU Chip | Process id |\n| 0 0 | 1234 | vllm | 56000 |\n| 1 0 | 1235 | vllm | 56000 |", + 0), ("", 0) + ] + self.cpu_alloc = CpuAlloc(0) + + def test_average_distribute(self): + self.cpu_alloc.npu_cpu_pool = { + 0: [10, 11, 12, 13], + 1: [10, 11, 12, 13] + } + groups = {"[10, 11, 12, 13]": [0, 1]} + result = self.cpu_alloc.average_distribute(groups) + self.assertEqual(result, {0: [10, 11], 1: [12, 13]}) + self.cpu_alloc.npu_cpu_pool = { + 0: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13], + 1: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13], + 2: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13] + } + groups = {"[0, 1, 2, 3, 4, 5]": [0, 1, 2]} + result = self.cpu_alloc.average_distribute(groups) + self.assertEqual(result, { + 0: [0, 1, 2, 3], + 1: [4, 5, 6, 7], + 2: [8, 9, 10, 11, 12, 13] + }) + + def test_extend_numa(self): + result = self.cpu_alloc.extend_numa([]) + self.assertEqual(result, []) + self.cpu_alloc.cpu_node = {0: 0, 1: 0, 2: 1, 3: 1} + self.cpu_alloc.numa_to_cpu_map = {0: [0, 1], 1: [2, 3]} + self.cpu_alloc.device_info.allowed_cpus = [0, 1, 2, 3] + result = self.cpu_alloc.extend_numa([0, 1]) + self.assertEqual(result, [0, 1, 2, 3]) + self.cpu_alloc.device_info.allowed_cpus = [0, 1, 3] + result = self.cpu_alloc.extend_numa([0, 1]) + self.assertEqual(result, [0, 1, 3]) + + @patch('vllm_ascend.cpu_binding.execute_command') + def test_build_cpu_node_map(self, mock_execute_command): + mock_execute_command.return_value = ("", 0) + with self.assertRaises(RuntimeError): + self.cpu_alloc.build_cpu_node_map() + mock_execute_command.return_value = ("0 0\n1 1\n2 0\n3 1", 0) + self.cpu_alloc.build_cpu_node_map() + expected_cpu_node = {0: 0, 1: 1, 2: 0, 3: 1} + expected_numa_to_cpu_map = {0: [0, 2], 1: [1, 3]} + self.assertEqual(self.cpu_alloc.cpu_node, expected_cpu_node) + self.assertEqual(self.cpu_alloc.numa_to_cpu_map, + expected_numa_to_cpu_map) + + @patch('vllm_ascend.cpu_binding.execute_command') + def test_handle_no_affinity(self, mock_execute_command): + mock_execute_command.side_effect = [("0 0\n1 1", 0), ("0 0\n1 1", 0)] + self.cpu_alloc.device_info.running_npu_list = [0, 1] + self.cpu_alloc.device_info.allowed_cpus = [0, 1, 2, 3] + self.cpu_alloc.device_info.affinity = {} + self.assertEqual(self.cpu_alloc.npu_cpu_pool, {}) + self.cpu_alloc.device_info.affinity = {0: [0, 1], 1: [2, 3]} + self.cpu_alloc.build_cpu_pools() + self.assertEqual(len(self.cpu_alloc.npu_cpu_pool), 2) + + @patch('vllm_ascend.cpu_binding.execute_command') + def test_allocate(self, mock_execute_command): + self.cpu_alloc.device_info.running_npu_list = [0] + self.cpu_alloc.npu_cpu_pool = {0: [0, 1, 2]} + self.cpu_alloc.allocate() + self.assertEqual(self.cpu_alloc.assign_main[0], [0]) + self.assertEqual(self.cpu_alloc.assign_acl[0], [1]) + self.assertEqual(self.cpu_alloc.assign_rel[0], [2]) + self.cpu_alloc.npu_cpu_pool = {0: [0, 1]} + with self.assertRaises(RuntimeError): + self.cpu_alloc.allocate() + + @patch('vllm_ascend.cpu_binding.execute_command') + def test_bind_threads(self, mock_execute_command): + thread_message = "1234 1234 ? 00:00:03 acl_thread\n4567 4567 ? 00:00:03 release_thread" + mock_execute_command.return_value = (thread_message, 0) + self.cpu_alloc.device_info.running_npu_list = [0] + self.cpu_alloc.assign_main = {0: [0, 1]} + self.cpu_alloc.assign_acl = {0: [2]} + self.cpu_alloc.assign_rel = {0: [3]} + self.cpu_alloc.bind_threads() + mock_execute_command.assert_called() + + +if __name__ == '__main__': + unittest.main() diff --git a/vllm_ascend/cpu_binding.py b/vllm_ascend/cpu_binding.py index 280e516e..9bcd5319 100644 --- a/vllm_ascend/cpu_binding.py +++ b/vllm_ascend/cpu_binding.py @@ -1,330 +1,319 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + import os import subprocess -from dataclasses import dataclass -from itertools import accumulate -from typing import Dict, List, Optional, Tuple, Union +from collections import defaultdict +from typing import Dict, List, Tuple import psutil -import torch_npu from vllm.logger import logger +ALLOWED_CPUS_PATH = "/proc/self/status" ASCEND_RT_VISIBLE_DEVICES = os.getenv("ASCEND_RT_VISIBLE_DEVICES") -CPU_BINDING_NUM = os.getenv("CPU_BINDING_NUM") -def execute_command(cmd_list): - with subprocess.Popen(cmd_list, +def execute_command(cmd: List[str]) -> Tuple[str, int]: + with subprocess.Popen(cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE) as p: - out, err = p.communicate(timeout=1000) - res = out.decode() - return res + out, _ = p.communicate(timeout=1000) + return out.decode(), p.returncode -@dataclass class DeviceInfo: - """ - Parse a single line of device information into structured data. - """ - _info_line: str = "" - npu_id: int = 0 - chip_id: int = 0 - chip_logic_id: Union[int, str] = 0 - chip_name: str = "" - def __post_init__(self): - npu_id_str, chip_id_str, chip_logic_id_str, self.chip_name = self._info_line.strip( - ).split(None, 3) - self.npu_id = int(npu_id_str) - self.chip_id = int(chip_id_str) - if chip_logic_id_str.isnumeric(): - self.chip_logic_id = int(chip_logic_id_str) + def __init__(self): + self.npu_map_info: Dict[str, Dict[str, str]] = self.get_npu_map_info() + self.allowed_cpus: List[int] = self.parse_allowed_cpus() + self.running_npu_list: List[int] = self.get_running_npus() + self.npu_affinity: Dict[int, List[int]] = self.parse_topo_affinity() + @staticmethod + def expand_cpu_list(allowed_list_str: str) -> List[int]: + allowed_cpus_list: List[int] = [] + for per_range in allowed_list_str.split(","): + if "-" in per_range: + start_cpu, end_cpu = map(int, per_range.split("-")) + allowed_cpus_list.extend(range(start_cpu, end_cpu + 1)) + else: + allowed_cpus_list.append(int(per_range)) + return allowed_cpus_list -class NpuHbmInfo: - visible_npu_ids: Optional[List[int]] = None - hbm_capacity: Optional[int] = None - hbm_usage: Optional[int] = None + @staticmethod + def get_npu_map_info() -> Dict[str, Dict[str, str]]: + npu_map_info: Dict[str, Dict[str, str]] = {} + npu_info, _ = execute_command(["npu-smi", "info", "-m"]) + npu_map = npu_info.strip().split("\n")[1:] + for line in npu_map: + npu_id, chip_id, chip_logic_id = line.strip().split()[:3] + if not chip_logic_id.isdigit(): + continue + if npu_id not in npu_map_info: + npu_map_info[npu_id] = {} + npu_map_info[npu_id][chip_id] = chip_logic_id + return npu_map_info - @classmethod - def set_visible_devices(cls, world_size): - """ - Determine which NPUs are visible to the current process and cache their - logical NPU IDs in `cls.visible_npu_ids`. - """ - if cls.visible_npu_ids: - return - if ASCEND_RT_VISIBLE_DEVICES is None: - devices = sorted(list(_get_device_map_info().keys())) - else: + def get_running_npus(self) -> List[int]: + npu_message, _ = execute_command(["npu-smi", "info"]) + in_proc_section = False + running_npu_set = set() + for line in npu_message.splitlines(): + line = line.strip() + if line.startswith("| NPU") and "Process id" in line: + in_proc_section = True + continue + if not in_proc_section: + continue + if line.startswith("| "): + parts = [p.strip() for p in line.strip("|").split("|")] + if len(parts) < 2: + continue + npu_id = parts[0].split()[0] + chip_id = parts[0].split()[1] + if not npu_id.isdigit() or not chip_id.isdigit(): + continue + chip_logic_id = self.npu_map_info.get(npu_id, {}).get(chip_id) + if not chip_logic_id or not chip_logic_id.isdigit(): + raise RuntimeError( + "Failed to get correct chip_logic_id from command 'npu-smi info -m'." + ) + running_npu_set.add(int(chip_logic_id)) + if ASCEND_RT_VISIBLE_DEVICES: devices_str = ASCEND_RT_VISIBLE_DEVICES - devices = [int(x) for x in devices_str.split(",")] - device_map_info = _get_device_map_info() - npu_ids = [] - for device in devices: - device_info = device_map_info.get(device) - if device_info is None: + devices_list = [int(x) for x in devices_str.split(",")] + running_npu_set = set(devices_list) & running_npu_set + if not running_npu_set: + raise RuntimeError( + "Can not get running npu info, you can use BIND_CPU=0 to skip." + ) + return sorted(running_npu_set) + + def parse_allowed_cpus(self) -> List[int]: + if not os.path.exists(ALLOWED_CPUS_PATH): + return [] + with open(ALLOWED_CPUS_PATH) as f: + for line in f: + if line.startswith("Cpus_allowed_list"): + return self.expand_cpu_list(line.split()[1]) + raise RuntimeError( + "Can not found specific 'Cpus_allowed_list' in the '/proc/self/status' file." + ) + + def parse_topo_affinity(self) -> Dict[int, List[int]]: + chip_logic_id = 0 + affinity: Dict[int, List[int]] = {} + affinity_message, _ = execute_command( + ["npu-smi", "info", "-t", "topo"]) + for line in affinity_message.splitlines(): + if line.startswith("NPU"): + parts = line.split() + last_part = parts[-1] + if last_part != "Affinity": + affinity[chip_logic_id] = self.expand_cpu_list(last_part) + chip_logic_id += 1 + return affinity + + +class CpuAlloc: + + def __init__(self, rank_id: int): + self.rank_id = rank_id + self.device_info: DeviceInfo = DeviceInfo() + self.cpu_node: Dict[int, int] = {} + self.numa_to_cpu_map: Dict[int, List[int]] = defaultdict(list) + self.npu_cpu_pool: Dict[int, List[int]] = {} + self.assign_main: Dict[int, List[int]] = {} + self.assign_acl: Dict[int, List[int]] = {} + self.assign_rel: Dict[int, List[int]] = {} + + @staticmethod + def get_threads_map( + thread_message: str) -> Dict[str, Dict[str, List[str]]]: + threads_map: Dict[str, Dict[str, List[str]]] = {} + for line in thread_message.splitlines(): + parts = line.split() + if len(parts) < 2: + continue + main_pid, sub_pid = parts[0], parts[1] + if "acl_thread" in line: + key = "acl_thread" + elif "release_thread" in line: + key = "release_thread" + else: + continue + if main_pid not in threads_map: + threads_map[main_pid] = { + "acl_thread": [], + "release_thread": [] + } + threads_map[main_pid][key].append(sub_pid) + return threads_map + + @staticmethod + def bind(pid: str, cpus: List[int], bind_sub_thread: bool) -> None: + if cpus: + cpu_list = ",".join(map(str, cpus)) + if bind_sub_thread: + bind_result, return_code = execute_command( + ["taskset", "-acp", cpu_list, pid]) + else: + bind_result, return_code = execute_command( + ["taskset", "-cp", cpu_list, pid]) + if return_code != 0: + raise RuntimeError(f"Failed to bind {pid} to CPU {cpu_list}.") + + def average_distribute( + self, groups: Dict[str, List[int]]) -> Dict[int, List[int]]: + result: Dict[int, List[int]] = {} + for key, npu_list in groups.items(): + cpu_list = sorted(self.npu_cpu_pool[npu_list[0]]) + cpu_num_per_npu = len(cpu_list) // len(npu_list) + for i, npu in enumerate(npu_list): + start_index = i * cpu_num_per_npu + end_index = (i + 1) * cpu_num_per_npu if i < len( + npu_list) - 1 else len(cpu_list) + result[npu] = cpu_list[start_index:end_index] + return result + + def extend_numa(self, cpu_list: List[int]) -> List[int]: + if not cpu_list: + return [] + nodes = {self.cpu_node[c] for c in cpu_list} + if len(nodes) != 1: + return cpu_list + node = list(nodes)[0] + next_node = (node + 1) % len(self.numa_to_cpu_map) + extended = cpu_list[:] + for cpu in self.numa_to_cpu_map[next_node]: + if cpu in self.device_info.allowed_cpus: + extended.append(cpu) + return sorted(set(extended)) + + def build_cpu_node_map(self) -> None: + cpu_numa_map, _ = execute_command(["lscpu", "-e=CPU,NODE"]) + for line in cpu_numa_map.splitlines(): + line = line.strip() + if not line or not line[0].isdigit(): + continue + cpu_str, node_str = line.split() + cpu = int(cpu_str) + node = int(node_str) + self.cpu_node[cpu] = node + self.numa_to_cpu_map[node].append(cpu) + if len(self.numa_to_cpu_map) == 0: + raise RuntimeError( + "lscpu command output error, no NUMA node available. Please check!" + ) + + def handle_no_affinity(self) -> None: + num_running_npu = len(self.device_info.running_npu_list) + num_numa_node = len(self.numa_to_cpu_map) + if num_numa_node == 0 or num_running_npu == 0: + return + if num_running_npu % num_numa_node != 0: + npu_num_per_node = num_running_npu // num_numa_node + 1 + else: + npu_num_per_node = num_running_npu // num_numa_node + index = 0 + for node in sorted(self.numa_to_cpu_map): + # Available CPUs on this NUMA (constrained by allowed_cpus) + cpus = [ + c for c in self.numa_to_cpu_map[node] + if c in self.device_info.allowed_cpus + ] + if not cpus: + continue + # The actual number of NPUs to be allocated on this NUMA. + npu_num_this_node = min(npu_num_per_node, num_running_npu - index) + if npu_num_this_node <= 0: + break + # Evenly distribute the CPUs of this NUMA node among npu_num_this_node NPUs. + total_cpu_num = len(cpus) + base_cpu_num = total_cpu_num // npu_num_this_node + extra_cpu_num = total_cpu_num % npu_num_this_node + start_index = 0 + for i in range(npu_num_this_node): + take_cpu_num = base_cpu_num + (1 if i < extra_cpu_num else 0) + end_index = start_index + take_cpu_num + select_cpus_list = cpus[start_index:end_index] + if index < num_running_npu: + npu = self.device_info.running_npu_list[index] + self.npu_cpu_pool[npu] = select_cpus_list + index += 1 + start_index = end_index + + def build_cpu_pools(self) -> None: + self.build_cpu_node_map() + if not self.device_info.npu_affinity: + self.handle_no_affinity() + return + for npu in self.device_info.running_npu_list: + base_cpu_list = [ + cpu for cpu in self.device_info.npu_affinity.get(npu, []) + if cpu in self.device_info.allowed_cpus + ] + if not base_cpu_list: raise RuntimeError( - f"Device {device} not found in device_map_info") - npu_ids.append(device_info.npu_id) - cls.visible_npu_ids = npu_ids + "CPUs available in 'Cpus_allowed_list' conflict with NUMA affinity." + ) + extra_cpu_list = self.extend_numa(base_cpu_list) + self.npu_cpu_pool[npu] = extra_cpu_list + groups = defaultdict(list) + for npu, cpus in self.npu_cpu_pool.items(): + groups[str(cpus)].append(npu) + final: Dict[int, List[int]] = {} + for key, npu_list in groups.items(): + if len(npu_list) == 1: + final[npu_list[0]] = self.npu_cpu_pool[npu_list[0]] + else: + final.update(self.average_distribute({key: npu_list})) + self.npu_cpu_pool = final - @classmethod - def get_hbm_capacity(cls, rank, world_size, need_nz): - """ - Query and cache the HBM (or DDR) capacity in **bytes** for the NPU assigned - to the current process. - """ - soc_version = torch_npu._C._npu_get_soc_version() - if cls.hbm_capacity: - return cls.hbm_capacity - if not cls.visible_npu_ids: - cls.set_visible_devices(world_size) - assert cls.visible_npu_ids is not None - npu_id = cls.visible_npu_ids[rank] - memory_info = execute_command( - ["npu-smi", "info", "-i", f"{npu_id}", "-t", - "memory"]).split("\n")[1:] - if soc_version == 240: - hbm_capacity_key = 'Capacity(MB)' - elif not need_nz: - hbm_capacity_key = 'HBM Capacity(MB)' - else: - hbm_capacity_key = 'DDR Capacity(MB)' - for line in memory_info: - try: - key, value = line.strip().split(':', 2) - if key.strip() == hbm_capacity_key: - cls.hbm_capacity = int(value.strip()) * 1024 * 1024 - return cls.hbm_capacity - except ValueError: - pass - raise ValueError('not found valid hbm capactiy') + def allocate(self) -> None: + for npu, pool in self.npu_cpu_pool.items(): + if len(pool) >= 3: + main = pool[:-2] + acl = [pool[-2]] + rel = [pool[-1]] + else: + raise RuntimeError( + "The number of CPUs is insufficient to bind to the NPUs. " + "Each NPU requires at least 3 CPUs.") + self.assign_main[npu] = main + self.assign_acl[npu] = acl + self.assign_rel[npu] = rel - @classmethod - def get_hbm_usage(cls, rank, world_size, need_nz): - """ - Return the current HBM or DDR usage - ratio (0-1) for the NPU assigned to the given rank. - """ - if cls.hbm_usage: - return cls.hbm_usage - if not cls.visible_npu_ids: - cls.set_visible_devices(world_size) - assert cls.visible_npu_ids is not None - npu_id = cls.visible_npu_ids[rank] - usage_info = execute_command( - ["npu-smi", "info", "-i", f"{npu_id}", "-t", - "usages"]).split("\n")[1:] - soc_version = torch_npu._C._npu_get_soc_version() - if soc_version == 240: - hbm_capacity_key = 'Memory Usage Rate(%)' - elif not need_nz: - hbm_capacity_key = 'HBM Usage Rate(%)' - else: - hbm_capacity_key = 'DDR Usage Rate(%)' - for line in usage_info: - try: - key, value = line.strip().split(':', 2) - if key.strip() == hbm_capacity_key: - hbm_usage = (float(value.strip()) + 1) / 100 - return hbm_usage - except ValueError: - pass - raise ValueError('not found valid hbm usage') + def print_plan(self) -> None: + logger.info("The CPU allocation plan is as follows:") + current_npu = self.device_info.running_npu_list[self.rank_id] + main = " ".join(map(str, self.assign_main[current_npu])) + acl = " ".join(map(str, self.assign_acl[current_npu])) + rel = str(self.assign_rel[current_npu] + ) if self.assign_rel[current_npu] else "" + logger.info( + f"NPU{current_npu}: main=[{main}] acl=[{acl}] release=[{rel}]") + + def bind_threads(self) -> None: + thread_message, _ = execute_command(["ps", "-Te"]) + threads_map = self.get_threads_map(thread_message) + main_pid = str(psutil.Process().pid) + current_npu = self.device_info.running_npu_list[self.rank_id] + self.bind(main_pid, self.assign_main[current_npu], True) + for acl_thread in threads_map.get(main_pid, {}).get("acl_thread", []): + self.bind(acl_thread, self.assign_acl[current_npu], False) + for release_thread in threads_map.get(main_pid, + {}).get("release_thread", []): + self.bind(release_thread, self.assign_rel[current_npu], False) + + def run_all(self) -> None: + self.build_cpu_pools() + self.allocate() + self.print_plan() + self.bind_threads() -def _get_device_map_info() -> Dict[int, DeviceInfo]: - """ - Build and return a mapping from logical chip ID (int) to its DeviceInfo object. - """ - device_map_info = {} - device_map = execute_command(["npu-smi", "info", - "-m"]).strip().split("\n")[1:] - for line in device_map: - device_info = DeviceInfo(line.strip()) - if isinstance(device_info.chip_logic_id, int): - device_map_info[device_info.chip_logic_id] = device_info - return device_map_info - - -def _get_pcie_info(devices: List[int], keyword="PCIeBusInfo"): - """ - Query each NPU in the given device list and return a mapping - from logical device ID to its PCIe bus address. - """ - device_map_info = _get_device_map_info() - device_pcie_tbl = {} - for device in devices: - device_info = device_map_info.get(device) - if not device_info: - raise RuntimeError( - "Can not get device info, you can use BIND_CPU=0 to skip.") - pcie_info = execute_command([ - "npu-smi", "info", "-t", "board", "-i", f"{device_info.npu_id}", - "-c", f"{device_info.chip_id}" - ]).strip().split("\n") - for _ in pcie_info: - line = ''.join(_.split()) - if line.startswith(keyword): - device_pcie_tbl[device] = line[len(keyword) + 1:] - break - - return device_pcie_tbl - - -def _get_numa_info(pcie_tbl, keyword="NUMAnode"): - """ - Build two mappings: device → NUMA node, and NUMA node → [devices]. - """ - device_numa_tbl: Dict[int, int] = {} # device id -> numa id - numa_devices_tbl: Dict[int, List[int]] = {} # numa id -> device ids - - for device, pcie_no in pcie_tbl.items(): - numa_info = execute_command(["lspci", "-s", f"{pcie_no}", - "-vvv"]).split("\n") - for _ in numa_info: - line = ''.join(_.split()) - if line.startswith(keyword): - numa_id = int(line[len(keyword) + 1:]) - device_numa_tbl[device] = numa_id - - devices = numa_devices_tbl.get(numa_id, None) - if devices is None: - numa_devices_tbl[numa_id] = list() - - numa_devices_tbl[numa_id].append(device) - break - - return device_numa_tbl, numa_devices_tbl - - -def _get_numa_info_v2( - devices: List[int], - keyword="NUMAnode(s)") -> Tuple[Dict[int, int], Dict[int, List[int]]]: - """ - Evenly distribute the given device list across all NUMA nodes and return - both device-to-numa and numa-to-devices mappings. - """ - numa_nodes = 1 - numa_info = execute_command(["lscpu"]).split("\n") - for _ in numa_info: - line = ''.join(_.split()) - if keyword not in line: - continue - numa_nodes = int(line[-1]) - break - - device_per_numa, tail_device = divmod(len(devices), numa_nodes) - device_count_per_numa_list = [ - device_per_numa + (i < tail_device) for i in range(numa_nodes) - ] - - ends = list(accumulate(device_count_per_numa_list)) - starts = [0] + ends[:-1] - - numa_devices_tbl = { - ind: devices[start:end] - for ind, (start, end) in enumerate(zip(starts, ends)) - } - - device_numa_tbl = { - device: numa - for numa, _devices in numa_devices_tbl.items() - for device in _devices - } - - return device_numa_tbl, numa_devices_tbl - - -def _get_cpu_info(numa_ids, keyword1="NUMAnode", keyword2="CPU(s)"): - """ - Parse lscpu output to build a dict that maps each NUMA - node ID to the list of CPU core IDs belonging to it. - """ - cpu_idx_tbl = dict() - numa_keywords = [keyword1 + str(idx) + keyword2 for idx in numa_ids] - cpu_info = execute_command(["lscpu"]).split("\n") - for _ in cpu_info: - line = ''.join(_.split()) - if any(line.startswith(word) for word in numa_keywords): - split_info = line.split(":") - cpu_id_ranges = split_info[-1].split(",") - - ranges = list() - for range_str in cpu_id_ranges: - endpoints = range_str.split("-") - if len(endpoints) != 2: - raise Exception( - "lscpu command output error, please check !") - - ranges += [ - cid for cid in range(int(endpoints[0]), - int(endpoints[1]) + 1) - ] - - numa_id = int(split_info[0].replace(keyword1, - '').replace(keyword2, '')) - cpu_idx_tbl[numa_id] = ranges - return cpu_idx_tbl - - -def bind_cpus(rank_id, ratio=0.5): - # get all visible devices - visible_devices = ASCEND_RT_VISIBLE_DEVICES - - if visible_devices is None: - devices = sorted(list(_get_device_map_info().keys())) - else: - devices = [int(x) for x in visible_devices.split(",")] - - # Query the NUMA affinity of each NPU via its PCIe address; if this fails, - # fall back to evenly distributing the devices across NUMA nodes. - device_pcie_tbl = _get_pcie_info(devices) - device_numa_tbl, numa_devices_tbl = _get_numa_info(device_pcie_tbl) - if not device_numa_tbl or not numa_devices_tbl: - device_numa_tbl, numa_devices_tbl = _get_numa_info_v2(devices) - - # Obtain the complete list of CPU cores for each NUMA node. - cpu_idx_tbl = _get_cpu_info(list(numa_devices_tbl.keys())) - - cur_device = devices[rank_id] - numa_id = device_numa_tbl.get(cur_device) - - # Within the NUMA node, evenly partition the CPU cores - # among all NPUs (or use the amount specified by CPU_BINDING_NUM) - shard_devices = numa_devices_tbl.get(numa_id) - shard_devices.sort() - - all_cpus = cpu_idx_tbl.get(numa_id) - logger.info( - f"rank_id: {rank_id}, device_id: {cur_device}, " - f"numa_id: {numa_id}, shard_devices: {shard_devices}, cpus: {all_cpus}" - ) - - cpu_nums = len(all_cpus) - if CPU_BINDING_NUM is None: - cpu_num_per_device = int(cpu_nums * ratio // len(shard_devices)) - else: - cpu_num_per_device = int(CPU_BINDING_NUM) - if len(shard_devices) * cpu_num_per_device > cpu_nums: - raise RuntimeError( - f"Cpu num in numa {numa_id} to assign {cpu_num_per_device} for every device is not enough, " - f"please decrease the value of CPU_BINDING_NUM!") - if cpu_num_per_device < 0: - raise ValueError("CPU_BINDING_NUM should not be less than 0.") - - idx = shard_devices.index(cur_device) - binding_cpus = [ - all_cpus[_] for _ in range(idx * cpu_num_per_device, (idx + 1) * - cpu_num_per_device) - ] - - # cpu bind - p = psutil.Process() - p.cpu_affinity(binding_cpus) - new_affinity = p.cpu_affinity() - logger.info( - f"process {p.pid}, new_affinity is {new_affinity}, cpu count {cpu_num_per_device}" - ) +def bind_cpus(rank_id: int) -> None: + binder = CpuAlloc(rank_id) + binder.run_all() diff --git a/vllm_ascend/worker/worker.py b/vllm_ascend/worker/worker.py index 0094a0eb..2da979e6 100644 --- a/vllm_ascend/worker/worker.py +++ b/vllm_ascend/worker/worker.py @@ -115,17 +115,6 @@ class NPUWorker(WorkerBase): distributed_init_method=distributed_init_method, is_driver_worker=is_driver_worker) - # binding cpu - if get_ascend_config().enable_cpu_binding: - try: - bind_cpus(self.local_rank, ratio=1.0) - except RuntimeError as e: - logger.error(f"{e} in {self.local_rank}") - except ValueError as e: - logger.error(f"{e} in {self.local_rank}") - except Exception: - logger.info("Skip binding cpu.") - if self.cache_config.cache_dtype == "auto": self.cache_dtype = self.model_config.dtype else: @@ -238,6 +227,15 @@ class NPUWorker(WorkerBase): set_random_seed(self.model_config.seed) # Initialize device properties used by triton kernels. init_device_properties_triton() + + # binding cpu + if get_ascend_config().enable_cpu_binding: + try: + bind_cpus(self.local_rank) + except Exception as e: + logger.warning( + f"Bind cpus failed in rank{self.local_rank}: {e} Skip binding cpu." + ) return device def init_device(self):