diff --git a/vllm_ascend/ascend_config.py b/vllm_ascend/ascend_config.py index 9f43b2d..b0973b1 100644 --- a/vllm_ascend/ascend_config.py +++ b/vllm_ascend/ascend_config.py @@ -101,6 +101,8 @@ class AscendConfig: raise AssertionError( "oproj_tensor_parallel_size is only supported in pd scenario and can only be used in D node." ) + self.enable_cpu_binding = additional_config.get( + "enable_cpu_binding", False) self.pd_tp_ratio = 1 self.pd_head_ratio = 1 self.num_head_replica = 1 diff --git a/vllm_ascend/cpu_binding.py b/vllm_ascend/cpu_binding.py new file mode 100644 index 0000000..280e516 --- /dev/null +++ b/vllm_ascend/cpu_binding.py @@ -0,0 +1,330 @@ +import os +import subprocess +from dataclasses import dataclass +from itertools import accumulate +from typing import Dict, List, Optional, Tuple, Union + +import psutil +import torch_npu +from vllm.logger import logger + +ASCEND_RT_VISIBLE_DEVICES = os.getenv("ASCEND_RT_VISIBLE_DEVICES") +CPU_BINDING_NUM = os.getenv("CPU_BINDING_NUM") + + +def execute_command(cmd_list): + with subprocess.Popen(cmd_list, + shell=False, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) as p: + out, err = p.communicate(timeout=1000) + res = out.decode() + return res + + +@dataclass +class DeviceInfo: + """ + Parse a single line of device information into structured data. + """ + _info_line: str = "" + npu_id: int = 0 + chip_id: int = 0 + chip_logic_id: Union[int, str] = 0 + chip_name: str = "" + + def __post_init__(self): + npu_id_str, chip_id_str, chip_logic_id_str, self.chip_name = self._info_line.strip( + ).split(None, 3) + self.npu_id = int(npu_id_str) + self.chip_id = int(chip_id_str) + if chip_logic_id_str.isnumeric(): + self.chip_logic_id = int(chip_logic_id_str) + + +class NpuHbmInfo: + visible_npu_ids: Optional[List[int]] = None + hbm_capacity: Optional[int] = None + hbm_usage: Optional[int] = None + + @classmethod + def set_visible_devices(cls, world_size): + """ + Determine which NPUs are visible to the current process and cache their + logical NPU IDs in `cls.visible_npu_ids`. + """ + if cls.visible_npu_ids: + return + if ASCEND_RT_VISIBLE_DEVICES is None: + devices = sorted(list(_get_device_map_info().keys())) + else: + devices_str = ASCEND_RT_VISIBLE_DEVICES + devices = [int(x) for x in devices_str.split(",")] + device_map_info = _get_device_map_info() + npu_ids = [] + for device in devices: + device_info = device_map_info.get(device) + if device_info is None: + raise RuntimeError( + f"Device {device} not found in device_map_info") + npu_ids.append(device_info.npu_id) + cls.visible_npu_ids = npu_ids + + @classmethod + def get_hbm_capacity(cls, rank, world_size, need_nz): + """ + Query and cache the HBM (or DDR) capacity in **bytes** for the NPU assigned + to the current process. + """ + soc_version = torch_npu._C._npu_get_soc_version() + if cls.hbm_capacity: + return cls.hbm_capacity + if not cls.visible_npu_ids: + cls.set_visible_devices(world_size) + assert cls.visible_npu_ids is not None + npu_id = cls.visible_npu_ids[rank] + memory_info = execute_command( + ["npu-smi", "info", "-i", f"{npu_id}", "-t", + "memory"]).split("\n")[1:] + if soc_version == 240: + hbm_capacity_key = 'Capacity(MB)' + elif not need_nz: + hbm_capacity_key = 'HBM Capacity(MB)' + else: + hbm_capacity_key = 'DDR Capacity(MB)' + for line in memory_info: + try: + key, value = line.strip().split(':', 2) + if key.strip() == hbm_capacity_key: + cls.hbm_capacity = int(value.strip()) * 1024 * 1024 + return cls.hbm_capacity + except ValueError: + pass + raise ValueError('not found valid hbm capactiy') + + @classmethod + def get_hbm_usage(cls, rank, world_size, need_nz): + """ + Return the current HBM or DDR usage + ratio (0-1) for the NPU assigned to the given rank. + """ + if cls.hbm_usage: + return cls.hbm_usage + if not cls.visible_npu_ids: + cls.set_visible_devices(world_size) + assert cls.visible_npu_ids is not None + npu_id = cls.visible_npu_ids[rank] + usage_info = execute_command( + ["npu-smi", "info", "-i", f"{npu_id}", "-t", + "usages"]).split("\n")[1:] + soc_version = torch_npu._C._npu_get_soc_version() + if soc_version == 240: + hbm_capacity_key = 'Memory Usage Rate(%)' + elif not need_nz: + hbm_capacity_key = 'HBM Usage Rate(%)' + else: + hbm_capacity_key = 'DDR Usage Rate(%)' + for line in usage_info: + try: + key, value = line.strip().split(':', 2) + if key.strip() == hbm_capacity_key: + hbm_usage = (float(value.strip()) + 1) / 100 + return hbm_usage + except ValueError: + pass + raise ValueError('not found valid hbm usage') + + +def _get_device_map_info() -> Dict[int, DeviceInfo]: + """ + Build and return a mapping from logical chip ID (int) to its DeviceInfo object. + """ + device_map_info = {} + device_map = execute_command(["npu-smi", "info", + "-m"]).strip().split("\n")[1:] + for line in device_map: + device_info = DeviceInfo(line.strip()) + if isinstance(device_info.chip_logic_id, int): + device_map_info[device_info.chip_logic_id] = device_info + return device_map_info + + +def _get_pcie_info(devices: List[int], keyword="PCIeBusInfo"): + """ + Query each NPU in the given device list and return a mapping + from logical device ID to its PCIe bus address. + """ + device_map_info = _get_device_map_info() + device_pcie_tbl = {} + for device in devices: + device_info = device_map_info.get(device) + if not device_info: + raise RuntimeError( + "Can not get device info, you can use BIND_CPU=0 to skip.") + pcie_info = execute_command([ + "npu-smi", "info", "-t", "board", "-i", f"{device_info.npu_id}", + "-c", f"{device_info.chip_id}" + ]).strip().split("\n") + for _ in pcie_info: + line = ''.join(_.split()) + if line.startswith(keyword): + device_pcie_tbl[device] = line[len(keyword) + 1:] + break + + return device_pcie_tbl + + +def _get_numa_info(pcie_tbl, keyword="NUMAnode"): + """ + Build two mappings: device → NUMA node, and NUMA node → [devices]. + """ + device_numa_tbl: Dict[int, int] = {} # device id -> numa id + numa_devices_tbl: Dict[int, List[int]] = {} # numa id -> device ids + + for device, pcie_no in pcie_tbl.items(): + numa_info = execute_command(["lspci", "-s", f"{pcie_no}", + "-vvv"]).split("\n") + for _ in numa_info: + line = ''.join(_.split()) + if line.startswith(keyword): + numa_id = int(line[len(keyword) + 1:]) + device_numa_tbl[device] = numa_id + + devices = numa_devices_tbl.get(numa_id, None) + if devices is None: + numa_devices_tbl[numa_id] = list() + + numa_devices_tbl[numa_id].append(device) + break + + return device_numa_tbl, numa_devices_tbl + + +def _get_numa_info_v2( + devices: List[int], + keyword="NUMAnode(s)") -> Tuple[Dict[int, int], Dict[int, List[int]]]: + """ + Evenly distribute the given device list across all NUMA nodes and return + both device-to-numa and numa-to-devices mappings. + """ + numa_nodes = 1 + numa_info = execute_command(["lscpu"]).split("\n") + for _ in numa_info: + line = ''.join(_.split()) + if keyword not in line: + continue + numa_nodes = int(line[-1]) + break + + device_per_numa, tail_device = divmod(len(devices), numa_nodes) + device_count_per_numa_list = [ + device_per_numa + (i < tail_device) for i in range(numa_nodes) + ] + + ends = list(accumulate(device_count_per_numa_list)) + starts = [0] + ends[:-1] + + numa_devices_tbl = { + ind: devices[start:end] + for ind, (start, end) in enumerate(zip(starts, ends)) + } + + device_numa_tbl = { + device: numa + for numa, _devices in numa_devices_tbl.items() + for device in _devices + } + + return device_numa_tbl, numa_devices_tbl + + +def _get_cpu_info(numa_ids, keyword1="NUMAnode", keyword2="CPU(s)"): + """ + Parse lscpu output to build a dict that maps each NUMA + node ID to the list of CPU core IDs belonging to it. + """ + cpu_idx_tbl = dict() + numa_keywords = [keyword1 + str(idx) + keyword2 for idx in numa_ids] + cpu_info = execute_command(["lscpu"]).split("\n") + for _ in cpu_info: + line = ''.join(_.split()) + if any(line.startswith(word) for word in numa_keywords): + split_info = line.split(":") + cpu_id_ranges = split_info[-1].split(",") + + ranges = list() + for range_str in cpu_id_ranges: + endpoints = range_str.split("-") + if len(endpoints) != 2: + raise Exception( + "lscpu command output error, please check !") + + ranges += [ + cid for cid in range(int(endpoints[0]), + int(endpoints[1]) + 1) + ] + + numa_id = int(split_info[0].replace(keyword1, + '').replace(keyword2, '')) + cpu_idx_tbl[numa_id] = ranges + return cpu_idx_tbl + + +def bind_cpus(rank_id, ratio=0.5): + # get all visible devices + visible_devices = ASCEND_RT_VISIBLE_DEVICES + + if visible_devices is None: + devices = sorted(list(_get_device_map_info().keys())) + else: + devices = [int(x) for x in visible_devices.split(",")] + + # Query the NUMA affinity of each NPU via its PCIe address; if this fails, + # fall back to evenly distributing the devices across NUMA nodes. + device_pcie_tbl = _get_pcie_info(devices) + device_numa_tbl, numa_devices_tbl = _get_numa_info(device_pcie_tbl) + if not device_numa_tbl or not numa_devices_tbl: + device_numa_tbl, numa_devices_tbl = _get_numa_info_v2(devices) + + # Obtain the complete list of CPU cores for each NUMA node. + cpu_idx_tbl = _get_cpu_info(list(numa_devices_tbl.keys())) + + cur_device = devices[rank_id] + numa_id = device_numa_tbl.get(cur_device) + + # Within the NUMA node, evenly partition the CPU cores + # among all NPUs (or use the amount specified by CPU_BINDING_NUM) + shard_devices = numa_devices_tbl.get(numa_id) + shard_devices.sort() + + all_cpus = cpu_idx_tbl.get(numa_id) + logger.info( + f"rank_id: {rank_id}, device_id: {cur_device}, " + f"numa_id: {numa_id}, shard_devices: {shard_devices}, cpus: {all_cpus}" + ) + + cpu_nums = len(all_cpus) + if CPU_BINDING_NUM is None: + cpu_num_per_device = int(cpu_nums * ratio // len(shard_devices)) + else: + cpu_num_per_device = int(CPU_BINDING_NUM) + if len(shard_devices) * cpu_num_per_device > cpu_nums: + raise RuntimeError( + f"Cpu num in numa {numa_id} to assign {cpu_num_per_device} for every device is not enough, " + f"please decrease the value of CPU_BINDING_NUM!") + if cpu_num_per_device < 0: + raise ValueError("CPU_BINDING_NUM should not be less than 0.") + + idx = shard_devices.index(cur_device) + binding_cpus = [ + all_cpus[_] for _ in range(idx * cpu_num_per_device, (idx + 1) * + cpu_num_per_device) + ] + + # cpu bind + p = psutil.Process() + p.cpu_affinity(binding_cpus) + new_affinity = p.cpu_affinity() + logger.info( + f"process {p.pid}, new_affinity is {new_affinity}, cpu count {cpu_num_per_device}" + ) diff --git a/vllm_ascend/worker/worker_v1.py b/vllm_ascend/worker/worker_v1.py index d8be9c2..f14823f 100644 --- a/vllm_ascend/worker/worker_v1.py +++ b/vllm_ascend/worker/worker_v1.py @@ -43,7 +43,8 @@ from vllm.v1.outputs import (EMPTY_MODEL_RUNNER_OUTPUT, AsyncModelRunnerOutput, from vllm.v1.worker.worker_base import WorkerBase import vllm_ascend.envs as envs_ascend -from vllm_ascend.ascend_config import init_ascend_config +from vllm_ascend.ascend_config import get_ascend_config, init_ascend_config +from vllm_ascend.cpu_binding import bind_cpus from vllm_ascend.device_allocator.camem import CaMemAllocator from vllm_ascend.distributed.parallel_state import init_ascend_model_parallel from vllm_ascend.platform import NPUPlatform @@ -110,6 +111,17 @@ class NPUWorker(WorkerBase): distributed_init_method=distributed_init_method, is_driver_worker=is_driver_worker) + # binding cpu + if get_ascend_config().enable_cpu_binding: + try: + bind_cpus(self.local_rank, ratio=1.0) + except RuntimeError as e: + logger.error(f"{e} in {self.local_rank}") + except ValueError as e: + logger.error(f"{e} in {self.local_rank}") + except Exception: + logger.info("Skip binding cpu.") + # Try to import mindie_turbo to accelerate vLLM inference. try_register_lib( "mindie_turbo",