[main] support cpu binding (#3546)

### What this PR does / why we need it?

Currently, in the piecewise of aclgraph, the model will be in eagle mode
in attention, which will cause abnormal allreduce latency of O matrix.
The reason is that cpu resources will be preempted in eagle mode. So I
hope to temporarily add cpu binding to vllm-ascend.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

CI passed with new existing test.

- vLLM version: v0.11.0rc3
- vLLM main: https://github.com/vllm-project/vllm/commit/v0.11.0

Signed-off-by: GDzhu1 <809721801@qq.com>
This commit is contained in:
Zhu Yi Lin
2025-10-21 09:17:03 +08:00
committed by GitHub
parent 274b708e0c
commit 4a849df6fa
3 changed files with 345 additions and 1 deletions

View File

@@ -101,6 +101,8 @@ class AscendConfig:
raise AssertionError(
"oproj_tensor_parallel_size is only supported in pd scenario and can only be used in D node."
)
self.enable_cpu_binding = additional_config.get(
"enable_cpu_binding", False)
self.pd_tp_ratio = 1
self.pd_head_ratio = 1
self.num_head_replica = 1

330
vllm_ascend/cpu_binding.py Normal file
View File

@@ -0,0 +1,330 @@
import os
import subprocess
from dataclasses import dataclass
from itertools import accumulate
from typing import Dict, List, Optional, Tuple, Union
import psutil
import torch_npu
from vllm.logger import logger
ASCEND_RT_VISIBLE_DEVICES = os.getenv("ASCEND_RT_VISIBLE_DEVICES")
CPU_BINDING_NUM = os.getenv("CPU_BINDING_NUM")
def execute_command(cmd_list):
with subprocess.Popen(cmd_list,
shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE) as p:
out, err = p.communicate(timeout=1000)
res = out.decode()
return res
@dataclass
class DeviceInfo:
"""
Parse a single line of device information into structured data.
"""
_info_line: str = ""
npu_id: int = 0
chip_id: int = 0
chip_logic_id: Union[int, str] = 0
chip_name: str = ""
def __post_init__(self):
npu_id_str, chip_id_str, chip_logic_id_str, self.chip_name = self._info_line.strip(
).split(None, 3)
self.npu_id = int(npu_id_str)
self.chip_id = int(chip_id_str)
if chip_logic_id_str.isnumeric():
self.chip_logic_id = int(chip_logic_id_str)
class NpuHbmInfo:
visible_npu_ids: Optional[List[int]] = None
hbm_capacity: Optional[int] = None
hbm_usage: Optional[int] = None
@classmethod
def set_visible_devices(cls, world_size):
"""
Determine which NPUs are visible to the current process and cache their
logical NPU IDs in `cls.visible_npu_ids`.
"""
if cls.visible_npu_ids:
return
if ASCEND_RT_VISIBLE_DEVICES is None:
devices = sorted(list(_get_device_map_info().keys()))
else:
devices_str = ASCEND_RT_VISIBLE_DEVICES
devices = [int(x) for x in devices_str.split(",")]
device_map_info = _get_device_map_info()
npu_ids = []
for device in devices:
device_info = device_map_info.get(device)
if device_info is None:
raise RuntimeError(
f"Device {device} not found in device_map_info")
npu_ids.append(device_info.npu_id)
cls.visible_npu_ids = npu_ids
@classmethod
def get_hbm_capacity(cls, rank, world_size, need_nz):
"""
Query and cache the HBM (or DDR) capacity in **bytes** for the NPU assigned
to the current process.
"""
soc_version = torch_npu._C._npu_get_soc_version()
if cls.hbm_capacity:
return cls.hbm_capacity
if not cls.visible_npu_ids:
cls.set_visible_devices(world_size)
assert cls.visible_npu_ids is not None
npu_id = cls.visible_npu_ids[rank]
memory_info = execute_command(
["npu-smi", "info", "-i", f"{npu_id}", "-t",
"memory"]).split("\n")[1:]
if soc_version == 240:
hbm_capacity_key = 'Capacity(MB)'
elif not need_nz:
hbm_capacity_key = 'HBM Capacity(MB)'
else:
hbm_capacity_key = 'DDR Capacity(MB)'
for line in memory_info:
try:
key, value = line.strip().split(':', 2)
if key.strip() == hbm_capacity_key:
cls.hbm_capacity = int(value.strip()) * 1024 * 1024
return cls.hbm_capacity
except ValueError:
pass
raise ValueError('not found valid hbm capactiy')
@classmethod
def get_hbm_usage(cls, rank, world_size, need_nz):
"""
Return the current HBM or DDR usage
ratio (0-1) for the NPU assigned to the given rank.
"""
if cls.hbm_usage:
return cls.hbm_usage
if not cls.visible_npu_ids:
cls.set_visible_devices(world_size)
assert cls.visible_npu_ids is not None
npu_id = cls.visible_npu_ids[rank]
usage_info = execute_command(
["npu-smi", "info", "-i", f"{npu_id}", "-t",
"usages"]).split("\n")[1:]
soc_version = torch_npu._C._npu_get_soc_version()
if soc_version == 240:
hbm_capacity_key = 'Memory Usage Rate(%)'
elif not need_nz:
hbm_capacity_key = 'HBM Usage Rate(%)'
else:
hbm_capacity_key = 'DDR Usage Rate(%)'
for line in usage_info:
try:
key, value = line.strip().split(':', 2)
if key.strip() == hbm_capacity_key:
hbm_usage = (float(value.strip()) + 1) / 100
return hbm_usage
except ValueError:
pass
raise ValueError('not found valid hbm usage')
def _get_device_map_info() -> Dict[int, DeviceInfo]:
"""
Build and return a mapping from logical chip ID (int) to its DeviceInfo object.
"""
device_map_info = {}
device_map = execute_command(["npu-smi", "info",
"-m"]).strip().split("\n")[1:]
for line in device_map:
device_info = DeviceInfo(line.strip())
if isinstance(device_info.chip_logic_id, int):
device_map_info[device_info.chip_logic_id] = device_info
return device_map_info
def _get_pcie_info(devices: List[int], keyword="PCIeBusInfo"):
"""
Query each NPU in the given device list and return a mapping
from logical device ID to its PCIe bus address.
"""
device_map_info = _get_device_map_info()
device_pcie_tbl = {}
for device in devices:
device_info = device_map_info.get(device)
if not device_info:
raise RuntimeError(
"Can not get device info, you can use BIND_CPU=0 to skip.")
pcie_info = execute_command([
"npu-smi", "info", "-t", "board", "-i", f"{device_info.npu_id}",
"-c", f"{device_info.chip_id}"
]).strip().split("\n")
for _ in pcie_info:
line = ''.join(_.split())
if line.startswith(keyword):
device_pcie_tbl[device] = line[len(keyword) + 1:]
break
return device_pcie_tbl
def _get_numa_info(pcie_tbl, keyword="NUMAnode"):
"""
Build two mappings: device → NUMA node, and NUMA node → [devices].
"""
device_numa_tbl: Dict[int, int] = {} # device id -> numa id
numa_devices_tbl: Dict[int, List[int]] = {} # numa id -> device ids
for device, pcie_no in pcie_tbl.items():
numa_info = execute_command(["lspci", "-s", f"{pcie_no}",
"-vvv"]).split("\n")
for _ in numa_info:
line = ''.join(_.split())
if line.startswith(keyword):
numa_id = int(line[len(keyword) + 1:])
device_numa_tbl[device] = numa_id
devices = numa_devices_tbl.get(numa_id, None)
if devices is None:
numa_devices_tbl[numa_id] = list()
numa_devices_tbl[numa_id].append(device)
break
return device_numa_tbl, numa_devices_tbl
def _get_numa_info_v2(
devices: List[int],
keyword="NUMAnode(s)") -> Tuple[Dict[int, int], Dict[int, List[int]]]:
"""
Evenly distribute the given device list across all NUMA nodes and return
both device-to-numa and numa-to-devices mappings.
"""
numa_nodes = 1
numa_info = execute_command(["lscpu"]).split("\n")
for _ in numa_info:
line = ''.join(_.split())
if keyword not in line:
continue
numa_nodes = int(line[-1])
break
device_per_numa, tail_device = divmod(len(devices), numa_nodes)
device_count_per_numa_list = [
device_per_numa + (i < tail_device) for i in range(numa_nodes)
]
ends = list(accumulate(device_count_per_numa_list))
starts = [0] + ends[:-1]
numa_devices_tbl = {
ind: devices[start:end]
for ind, (start, end) in enumerate(zip(starts, ends))
}
device_numa_tbl = {
device: numa
for numa, _devices in numa_devices_tbl.items()
for device in _devices
}
return device_numa_tbl, numa_devices_tbl
def _get_cpu_info(numa_ids, keyword1="NUMAnode", keyword2="CPU(s)"):
"""
Parse lscpu output to build a dict that maps each NUMA
node ID to the list of CPU core IDs belonging to it.
"""
cpu_idx_tbl = dict()
numa_keywords = [keyword1 + str(idx) + keyword2 for idx in numa_ids]
cpu_info = execute_command(["lscpu"]).split("\n")
for _ in cpu_info:
line = ''.join(_.split())
if any(line.startswith(word) for word in numa_keywords):
split_info = line.split(":")
cpu_id_ranges = split_info[-1].split(",")
ranges = list()
for range_str in cpu_id_ranges:
endpoints = range_str.split("-")
if len(endpoints) != 2:
raise Exception(
"lscpu command output error, please check !")
ranges += [
cid for cid in range(int(endpoints[0]),
int(endpoints[1]) + 1)
]
numa_id = int(split_info[0].replace(keyword1,
'').replace(keyword2, ''))
cpu_idx_tbl[numa_id] = ranges
return cpu_idx_tbl
def bind_cpus(rank_id, ratio=0.5):
# get all visible devices
visible_devices = ASCEND_RT_VISIBLE_DEVICES
if visible_devices is None:
devices = sorted(list(_get_device_map_info().keys()))
else:
devices = [int(x) for x in visible_devices.split(",")]
# Query the NUMA affinity of each NPU via its PCIe address; if this fails,
# fall back to evenly distributing the devices across NUMA nodes.
device_pcie_tbl = _get_pcie_info(devices)
device_numa_tbl, numa_devices_tbl = _get_numa_info(device_pcie_tbl)
if not device_numa_tbl or not numa_devices_tbl:
device_numa_tbl, numa_devices_tbl = _get_numa_info_v2(devices)
# Obtain the complete list of CPU cores for each NUMA node.
cpu_idx_tbl = _get_cpu_info(list(numa_devices_tbl.keys()))
cur_device = devices[rank_id]
numa_id = device_numa_tbl.get(cur_device)
# Within the NUMA node, evenly partition the CPU cores
# among all NPUs (or use the amount specified by CPU_BINDING_NUM)
shard_devices = numa_devices_tbl.get(numa_id)
shard_devices.sort()
all_cpus = cpu_idx_tbl.get(numa_id)
logger.info(
f"rank_id: {rank_id}, device_id: {cur_device}, "
f"numa_id: {numa_id}, shard_devices: {shard_devices}, cpus: {all_cpus}"
)
cpu_nums = len(all_cpus)
if CPU_BINDING_NUM is None:
cpu_num_per_device = int(cpu_nums * ratio // len(shard_devices))
else:
cpu_num_per_device = int(CPU_BINDING_NUM)
if len(shard_devices) * cpu_num_per_device > cpu_nums:
raise RuntimeError(
f"Cpu num in numa {numa_id} to assign {cpu_num_per_device} for every device is not enough, "
f"please decrease the value of CPU_BINDING_NUM!")
if cpu_num_per_device < 0:
raise ValueError("CPU_BINDING_NUM should not be less than 0.")
idx = shard_devices.index(cur_device)
binding_cpus = [
all_cpus[_] for _ in range(idx * cpu_num_per_device, (idx + 1) *
cpu_num_per_device)
]
# cpu bind
p = psutil.Process()
p.cpu_affinity(binding_cpus)
new_affinity = p.cpu_affinity()
logger.info(
f"process {p.pid}, new_affinity is {new_affinity}, cpu count {cpu_num_per_device}"
)

View File

@@ -43,7 +43,8 @@ from vllm.v1.outputs import (EMPTY_MODEL_RUNNER_OUTPUT, AsyncModelRunnerOutput,
from vllm.v1.worker.worker_base import WorkerBase
import vllm_ascend.envs as envs_ascend
from vllm_ascend.ascend_config import init_ascend_config
from vllm_ascend.ascend_config import get_ascend_config, init_ascend_config
from vllm_ascend.cpu_binding import bind_cpus
from vllm_ascend.device_allocator.camem import CaMemAllocator
from vllm_ascend.distributed.parallel_state import init_ascend_model_parallel
from vllm_ascend.platform import NPUPlatform
@@ -110,6 +111,17 @@ class NPUWorker(WorkerBase):
distributed_init_method=distributed_init_method,
is_driver_worker=is_driver_worker)
# binding cpu
if get_ascend_config().enable_cpu_binding:
try:
bind_cpus(self.local_rank, ratio=1.0)
except RuntimeError as e:
logger.error(f"{e} in {self.local_rank}")
except ValueError as e:
logger.error(f"{e} in {self.local_rank}")
except Exception:
logger.info("Skip binding cpu.")
# Try to import mindie_turbo to accelerate vLLM inference.
try_register_lib(
"mindie_turbo",