Files
xc-llm-ascend/vllm_ascend/cpu_binding.py

331 lines
11 KiB
Python
Raw Normal View History

import os
import subprocess
from dataclasses import dataclass
from itertools import accumulate
from typing import Dict, List, Optional, Tuple, Union
import psutil
import torch_npu
from vllm.logger import logger
ASCEND_RT_VISIBLE_DEVICES = os.getenv("ASCEND_RT_VISIBLE_DEVICES")
CPU_BINDING_NUM = os.getenv("CPU_BINDING_NUM")
def execute_command(cmd_list):
with subprocess.Popen(cmd_list,
shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE) as p:
out, err = p.communicate(timeout=1000)
res = out.decode()
return res
@dataclass
class DeviceInfo:
"""
Parse a single line of device information into structured data.
"""
_info_line: str = ""
npu_id: int = 0
chip_id: int = 0
chip_logic_id: Union[int, str] = 0
chip_name: str = ""
def __post_init__(self):
npu_id_str, chip_id_str, chip_logic_id_str, self.chip_name = self._info_line.strip(
).split(None, 3)
self.npu_id = int(npu_id_str)
self.chip_id = int(chip_id_str)
if chip_logic_id_str.isnumeric():
self.chip_logic_id = int(chip_logic_id_str)
class NpuHbmInfo:
visible_npu_ids: Optional[List[int]] = None
hbm_capacity: Optional[int] = None
hbm_usage: Optional[int] = None
@classmethod
def set_visible_devices(cls, world_size):
"""
Determine which NPUs are visible to the current process and cache their
logical NPU IDs in `cls.visible_npu_ids`.
"""
if cls.visible_npu_ids:
return
if ASCEND_RT_VISIBLE_DEVICES is None:
devices = sorted(list(_get_device_map_info().keys()))
else:
devices_str = ASCEND_RT_VISIBLE_DEVICES
devices = [int(x) for x in devices_str.split(",")]
device_map_info = _get_device_map_info()
npu_ids = []
for device in devices:
device_info = device_map_info.get(device)
if device_info is None:
raise RuntimeError(
f"Device {device} not found in device_map_info")
npu_ids.append(device_info.npu_id)
cls.visible_npu_ids = npu_ids
@classmethod
def get_hbm_capacity(cls, rank, world_size, need_nz):
"""
Query and cache the HBM (or DDR) capacity in **bytes** for the NPU assigned
to the current process.
"""
soc_version = torch_npu._C._npu_get_soc_version()
if cls.hbm_capacity:
return cls.hbm_capacity
if not cls.visible_npu_ids:
cls.set_visible_devices(world_size)
assert cls.visible_npu_ids is not None
npu_id = cls.visible_npu_ids[rank]
memory_info = execute_command(
["npu-smi", "info", "-i", f"{npu_id}", "-t",
"memory"]).split("\n")[1:]
if soc_version == 240:
hbm_capacity_key = 'Capacity(MB)'
elif not need_nz:
hbm_capacity_key = 'HBM Capacity(MB)'
else:
hbm_capacity_key = 'DDR Capacity(MB)'
for line in memory_info:
try:
key, value = line.strip().split(':', 2)
if key.strip() == hbm_capacity_key:
cls.hbm_capacity = int(value.strip()) * 1024 * 1024
return cls.hbm_capacity
except ValueError:
pass
raise ValueError('not found valid hbm capactiy')
@classmethod
def get_hbm_usage(cls, rank, world_size, need_nz):
"""
Return the current HBM or DDR usage
ratio (0-1) for the NPU assigned to the given rank.
"""
if cls.hbm_usage:
return cls.hbm_usage
if not cls.visible_npu_ids:
cls.set_visible_devices(world_size)
assert cls.visible_npu_ids is not None
npu_id = cls.visible_npu_ids[rank]
usage_info = execute_command(
["npu-smi", "info", "-i", f"{npu_id}", "-t",
"usages"]).split("\n")[1:]
soc_version = torch_npu._C._npu_get_soc_version()
if soc_version == 240:
hbm_capacity_key = 'Memory Usage Rate(%)'
elif not need_nz:
hbm_capacity_key = 'HBM Usage Rate(%)'
else:
hbm_capacity_key = 'DDR Usage Rate(%)'
for line in usage_info:
try:
key, value = line.strip().split(':', 2)
if key.strip() == hbm_capacity_key:
hbm_usage = (float(value.strip()) + 1) / 100
return hbm_usage
except ValueError:
pass
raise ValueError('not found valid hbm usage')
def _get_device_map_info() -> Dict[int, DeviceInfo]:
"""
Build and return a mapping from logical chip ID (int) to its DeviceInfo object.
"""
device_map_info = {}
device_map = execute_command(["npu-smi", "info",
"-m"]).strip().split("\n")[1:]
for line in device_map:
device_info = DeviceInfo(line.strip())
if isinstance(device_info.chip_logic_id, int):
device_map_info[device_info.chip_logic_id] = device_info
return device_map_info
def _get_pcie_info(devices: List[int], keyword="PCIeBusInfo"):
"""
Query each NPU in the given device list and return a mapping
from logical device ID to its PCIe bus address.
"""
device_map_info = _get_device_map_info()
device_pcie_tbl = {}
for device in devices:
device_info = device_map_info.get(device)
if not device_info:
raise RuntimeError(
"Can not get device info, you can use BIND_CPU=0 to skip.")
pcie_info = execute_command([
"npu-smi", "info", "-t", "board", "-i", f"{device_info.npu_id}",
"-c", f"{device_info.chip_id}"
]).strip().split("\n")
for _ in pcie_info:
line = ''.join(_.split())
if line.startswith(keyword):
device_pcie_tbl[device] = line[len(keyword) + 1:]
break
return device_pcie_tbl
def _get_numa_info(pcie_tbl, keyword="NUMAnode"):
"""
Build two mappings: device NUMA node, and NUMA node [devices].
"""
device_numa_tbl: Dict[int, int] = {} # device id -> numa id
numa_devices_tbl: Dict[int, List[int]] = {} # numa id -> device ids
for device, pcie_no in pcie_tbl.items():
numa_info = execute_command(["lspci", "-s", f"{pcie_no}",
"-vvv"]).split("\n")
for _ in numa_info:
line = ''.join(_.split())
if line.startswith(keyword):
numa_id = int(line[len(keyword) + 1:])
device_numa_tbl[device] = numa_id
devices = numa_devices_tbl.get(numa_id, None)
if devices is None:
numa_devices_tbl[numa_id] = list()
numa_devices_tbl[numa_id].append(device)
break
return device_numa_tbl, numa_devices_tbl
def _get_numa_info_v2(
devices: List[int],
keyword="NUMAnode(s)") -> Tuple[Dict[int, int], Dict[int, List[int]]]:
"""
Evenly distribute the given device list across all NUMA nodes and return
both device-to-numa and numa-to-devices mappings.
"""
numa_nodes = 1
numa_info = execute_command(["lscpu"]).split("\n")
for _ in numa_info:
line = ''.join(_.split())
if keyword not in line:
continue
numa_nodes = int(line[-1])
break
device_per_numa, tail_device = divmod(len(devices), numa_nodes)
device_count_per_numa_list = [
device_per_numa + (i < tail_device) for i in range(numa_nodes)
]
ends = list(accumulate(device_count_per_numa_list))
starts = [0] + ends[:-1]
numa_devices_tbl = {
ind: devices[start:end]
for ind, (start, end) in enumerate(zip(starts, ends))
}
device_numa_tbl = {
device: numa
for numa, _devices in numa_devices_tbl.items()
for device in _devices
}
return device_numa_tbl, numa_devices_tbl
def _get_cpu_info(numa_ids, keyword1="NUMAnode", keyword2="CPU(s)"):
"""
Parse lscpu output to build a dict that maps each NUMA
node ID to the list of CPU core IDs belonging to it.
"""
cpu_idx_tbl = dict()
numa_keywords = [keyword1 + str(idx) + keyword2 for idx in numa_ids]
cpu_info = execute_command(["lscpu"]).split("\n")
for _ in cpu_info:
line = ''.join(_.split())
if any(line.startswith(word) for word in numa_keywords):
split_info = line.split(":")
cpu_id_ranges = split_info[-1].split(",")
ranges = list()
for range_str in cpu_id_ranges:
endpoints = range_str.split("-")
if len(endpoints) != 2:
raise Exception(
"lscpu command output error, please check !")
ranges += [
cid for cid in range(int(endpoints[0]),
int(endpoints[1]) + 1)
]
numa_id = int(split_info[0].replace(keyword1,
'').replace(keyword2, ''))
cpu_idx_tbl[numa_id] = ranges
return cpu_idx_tbl
def bind_cpus(rank_id, ratio=0.5):
# get all visible devices
visible_devices = ASCEND_RT_VISIBLE_DEVICES
if visible_devices is None:
devices = sorted(list(_get_device_map_info().keys()))
else:
devices = [int(x) for x in visible_devices.split(",")]
# Query the NUMA affinity of each NPU via its PCIe address; if this fails,
# fall back to evenly distributing the devices across NUMA nodes.
device_pcie_tbl = _get_pcie_info(devices)
device_numa_tbl, numa_devices_tbl = _get_numa_info(device_pcie_tbl)
if not device_numa_tbl or not numa_devices_tbl:
device_numa_tbl, numa_devices_tbl = _get_numa_info_v2(devices)
# Obtain the complete list of CPU cores for each NUMA node.
cpu_idx_tbl = _get_cpu_info(list(numa_devices_tbl.keys()))
cur_device = devices[rank_id]
numa_id = device_numa_tbl.get(cur_device)
# Within the NUMA node, evenly partition the CPU cores
# among all NPUs (or use the amount specified by CPU_BINDING_NUM)
shard_devices = numa_devices_tbl.get(numa_id)
shard_devices.sort()
all_cpus = cpu_idx_tbl.get(numa_id)
logger.info(
f"rank_id: {rank_id}, device_id: {cur_device}, "
f"numa_id: {numa_id}, shard_devices: {shard_devices}, cpus: {all_cpus}"
)
cpu_nums = len(all_cpus)
if CPU_BINDING_NUM is None:
cpu_num_per_device = int(cpu_nums * ratio // len(shard_devices))
else:
cpu_num_per_device = int(CPU_BINDING_NUM)
if len(shard_devices) * cpu_num_per_device > cpu_nums:
raise RuntimeError(
f"Cpu num in numa {numa_id} to assign {cpu_num_per_device} for every device is not enough, "
f"please decrease the value of CPU_BINDING_NUM!")
if cpu_num_per_device < 0:
raise ValueError("CPU_BINDING_NUM should not be less than 0.")
idx = shard_devices.index(cur_device)
binding_cpus = [
all_cpus[_] for _ in range(idx * cpu_num_per_device, (idx + 1) *
cpu_num_per_device)
]
# cpu bind
p = psutil.Process()
p.cpu_affinity(binding_cpus)
new_affinity = p.cpu_affinity()
logger.info(
f"process {p.pid}, new_affinity is {new_affinity}, cpu count {cpu_num_per_device}"
)