adapt to vllm-ascend v0.18.0rc1
Some checks failed
Merge Conflict Labeler / main (push) Has been cancelled

This commit is contained in:
starkwj
2026-04-21 03:05:32 +00:00
parent 99e1ea0fe6
commit e4d898b245
132 changed files with 28743 additions and 100 deletions

View File

@@ -21,10 +21,12 @@ import os
from collections.abc import Callable
from contextlib import contextmanager
from typing import Any
import time
import torch
from acl.rt import memcpy # type: ignore # noqa: F401
from vllm.logger import logger
import vllm_ascend.envs as envs_ascend
def find_loaded_library(lib_name) -> str | None:
@@ -54,11 +56,23 @@ def find_loaded_library(lib_name) -> str | None:
camem_available = False
try:
from vllm_ascend.vllm_ascend_C import ( # type: ignore # noqa: F401
init_module,
python_create_and_map,
python_unmap_and_release,
)
if envs_ascend.VLLM_ASCEND_ENABLE_VNPU:
from vllm_ascend.vllm_ascend_C import ( # type: ignore # noqa: F401
init_module_offload as init_module,
python_create_and_map_offload as python_create_and_map,python_unmap_and_release_offload as python_unmap_and_release,
python_get_mem_info_offload as python_get_mem_info,
python_try_lock_gpu_offload as python_try_lock_gpu,
python_unlock_gpu_offload as python_unlock_gpu
)
else:
from vllm_ascend.vllm_ascend_C import ( # type: ignore # noqa: F401
init_module,
python_create_and_map,
python_unmap_and_release,
)
python_get_mem_info = None
python_try_lock_gpu = None
python_unlock_gpu = None
lib_name = find_loaded_library("vllm_ascend_C")
camem_available = True
@@ -67,6 +81,9 @@ except ImportError as e:
init_module = None
python_create_and_map = None
python_unmap_and_release = None
python_get_mem_info = None
python_try_lock_gpu = None
python_unlock_gpu = None
lib_name = None
libcudart = None
@@ -93,8 +110,17 @@ def get_pluggable_allocator(
python_malloc_fn: Callable[[tuple[int, int, int, int]], None],
python_free_func: Callable[[int], tuple[int, int, int, int]],
) -> torch.npu.memory.NPUPluggableAllocator:
init_module(python_malloc_fn, python_free_func)
new_alloc = torch.npu.memory.NPUPluggableAllocator(lib_name, "my_malloc", "my_free")
if envs_ascend.VLLM_ASCEND_ENABLE_VNPU:
current_device = torch.npu.current_device()
init_module(python_malloc_fn, python_free_func, current_device)
new_alloc = torch.npu.memory.NPUPluggableAllocator(
lib_name, 'my_malloc_offload', 'my_free_offload'
)
else:
init_module(python_malloc_fn, python_free_func)
new_alloc = torch.npu.memory.NPUPluggableAllocator(
lib_name, 'my_malloc', 'my_free'
)
return new_alloc
@@ -245,6 +271,9 @@ class CaMemAllocator:
# to avoid the issue, we keep a reference of the data.
# see https://github.com/pytorch/pytorch/issues/146431 .
self.allocator_and_pools[tag] = data
# lock gpu
if envs_ascend.VLLM_ASCEND_ENABLE_VNPU:
self._vnpu_lock_gpu()
yield
# PyTorch's bug, calling torch.cuda.empty_cache() will error
# when using pluggable allocator, see
@@ -256,6 +285,8 @@ class CaMemAllocator:
# allocate memory.
# TODO: we need to find a way to release the memory,
# i.e. calling torch.cuda.empty_cache()
if envs_ascend.VLLM_ASCEND_ENABLE_VNPU:
self.vnpu_unlock_gpu()
self.current_tag = old_tag
def get_current_usage(self) -> int:
@@ -267,3 +298,104 @@ class CaMemAllocator:
handle = data.handle
sum_bytes += handle[1]
return sum_bytes
def vnpu_try_lock_gpu(self) -> tuple[bool, bool]:
if python_try_lock_gpu:
return python_try_lock_gpu()
else:
return False, False
def _vnpu_lock_gpu(self) -> bool:
while True:
success, _ = self.vnpu_try_lock_gpu()
if success:
return True
time.sleep(0.001)
def vnpu_unlock_gpu(self):
if python_unlock_gpu:
python_unlock_gpu()
def get_pool_mem_info(self) -> tuple[int, int]:
"""
get available memory in reserved pool."""
return python_get_mem_info()
def offload_vram(
self,
offload_tags: tuple[str, ...] | str | None = None) -> None:
"""
Put the allocator in sleep mode.
All data in the memory allocation with the specified tag will be
offloaded to CPU memory, and others will be discarded.
:param offload_tags: The tags of the memory allocation that will be
offloaded. The rest of the memory allocation will be discarded.
"""
if offload_tags is None:
# by default, allocated tensors are offloaded
# when the allocator sleeps
offload_tags = (CaMemAllocator.default_tag, )
elif isinstance(offload_tags, str):
offload_tags = (offload_tags, )
assert isinstance(offload_tags, tuple)
sz_weights = 0
sz_kvcache = 0
for ptr, data in self.pointer_to_data.items():
handle = data.handle
if data.tag in offload_tags:
size_in_bytes = handle[1]
if data.cpu_backup_tensor is None:
cpu_backup_tensor = torch.empty(
size_in_bytes,
dtype=torch.uint8,
device='cpu',
pin_memory=True)
cpu_ptr = cpu_backup_tensor.data_ptr()
ACL_MEMCPY_DEVICE_TO_HOST = 2
dest_max = cpu_ptr + size_in_bytes * 2
memcpy(cpu_ptr, dest_max, ptr, size_in_bytes,
ACL_MEMCPY_DEVICE_TO_HOST)
data.cpu_backup_tensor = cpu_backup_tensor
unmap_and_release(handle)
sz_weights += size_in_bytes
else:
size_in_bytes = handle[1]
unmap_and_release(handle)
sz_kvcache += size_in_bytes
# self.requested_vram_size = sz_weights + sz_kvcache
self.vnpu_unlock_gpu()
# logger.info(f"offload: tags {offload_tags}: {sz_weights/(1024**3):.2f} GB, discard kv cache: {sz_kvcache/(1024**3):.2f} GB")
def try_reload_vram(self, tags: list[str] | None = None) -> tuple[bool, bool]:
succ, prev_is_self = self.vnpu_try_lock_gpu()
if not succ:
# not get the lock
return False, prev_is_self
if prev_is_self:
# nothing to do
return succ, prev_is_self
for ptr, data in self.pointer_to_data.items():
handle = data.handle
if tags is None or data.tag in tags:
create_and_map(handle)
if data.cpu_backup_tensor is not None:
cpu_backup_tensor = data.cpu_backup_tensor
size_in_bytes = cpu_backup_tensor.numel(
) * cpu_backup_tensor.element_size()
cpu_ptr = cpu_backup_tensor.data_ptr()
ACL_MEMCPY_HOST_TO_DEVICE = 1
dest_max = ptr + size_in_bytes * 2
memcpy(ptr, dest_max, cpu_ptr, size_in_bytes,
ACL_MEMCPY_HOST_TO_DEVICE)
# data.cpu_backup_tensor = None
# TO check: no need to re-memset if we reset_prefix_cache
# else:
# size_in_bytes = handle[1]
# memset(ptr, size_in_bytes, 0, size_in_bytes)
return succ, prev_is_self

View File

@@ -107,6 +107,7 @@ env_variables: dict[str, Callable[[], Any]] = {
"VLLM_ASCEND_FUSION_OP_TRANSPOSE_KV_CACHE_BY_BLOCK": lambda: bool(
int(os.getenv("VLLM_ASCEND_FUSION_OP_TRANSPOSE_KV_CACHE_BY_BLOCK", "1"))
),
"VLLM_ASCEND_ENABLE_VNPU": lambda: bool(int(os.getenv("VLLM_ASCEND_ENABLE_VNPU", 1))),
}
# end-env-vars-definition

View File

@@ -37,3 +37,6 @@ if os.getenv("DYNAMIC_EPLB", "false").lower() in ("true", "1") or os.getenv("EXP
if envs.VLLM_ASCEND_BALANCE_SCHEDULING:
import vllm_ascend.patch.platform.patch_balance_schedule # noqa
import vllm_ascend.patch.platform.patch_executor # noqa
import vllm_ascend.patch.platform.patch_core # noqa

View File

@@ -0,0 +1,151 @@
from logging import DEBUG
import os
import queue
import time
import vllm.envs as envs
from vllm.config import ParallelConfig, VllmConfig
from vllm.logger import logger
from vllm.v1.kv_cache_interface import KVCacheConfig
from vllm.v1.core.kv_cache_utils import (generate_scheduler_kv_cache_config,
get_kv_cache_configs)
from vllm.v1.engine.core import EngineCoreProc, EngineCore
from vllm.tracing import instrument
import vllm_ascend.envs as envs_ascend
def run_busy_loop(self):
"""Core busy loop of the EngineCore."""
while self._handle_shutdown():
# 1) Poll the input queue until there is work to do.
self._process_input_queue()
if (
envs_ascend.VLLM_ASCEND_ENABLE_VNPU
and self.has_work()
and self.model_executor.is_offloaded()
):
prev_is_self = self.model_executor.reload_vram()
if not prev_is_self:
self.reset_prefix_cache()
# 2) Step the engine core and return the outputs.
self._process_engine_step()
if (
envs_ascend.VLLM_ASCEND_ENABLE_VNPU
and not self.has_work()
and not self.model_executor.is_offloaded()
):
self.model_executor.offload_vram()
raise SystemExit
def _process_input_queue(self):
"""Exits when an engine step needs to be performed."""
waited = False
while not self.has_work() and self.is_running():
# Notify callbacks waiting for engine to become idle.
self._notify_idle_state_callbacks()
if self.input_queue.empty():
# Drain aborts queue; all aborts are also processed via input_queue.
with self.aborts_queue.mutex:
self.aborts_queue.queue.clear()
if logger.isEnabledFor(DEBUG):
logger.debug("EngineCore waiting for work.")
waited = True
# vnpu offload if idle
if (
envs_ascend.VLLM_ASCEND_ENABLE_VNPU
and not self.model_executor.is_offloaded()
):
self.model_executor.offload_vram()
block = self.process_input_queue_block
try:
req = self.input_queue.get(block=block)
self._handle_client_request(*req)
except queue.Empty:
break
if not block:
break
if waited:
logger.debug("EngineCore loop active.")
# Handle any more client requests.
while not self.input_queue.empty():
req = self.input_queue.get_nowait()
self._handle_client_request(*req)
@instrument(span_name="Prepare model")
def _initialize_kv_caches(self, vllm_config: VllmConfig) -> KVCacheConfig:
start = time.time()
# Get all kv cache needed by the model
kv_cache_specs = self.model_executor.get_kv_cache_specs()
has_kv_cache = any(kv_cache_spec for kv_cache_spec in kv_cache_specs)
if has_kv_cache:
if envs_ascend.VLLM_ASCEND_ENABLE_VNPU:
# get available memory in idle offload mode
available_gpu_memory = (
self.model_executor.determine_available_memory_vnpu_offload_mode())
self.available_gpu_memory_for_kv_cache = \
available_gpu_memory[0]
elif envs.VLLM_ELASTIC_EP_SCALE_UP_LAUNCH:
# NOTE(yongji): should already be set
# during _eep_scale_up_before_kv_init
assert self.available_gpu_memory_for_kv_cache > 0
available_gpu_memory = [self.available_gpu_memory_for_kv_cache] * len(
kv_cache_specs
)
else:
# Profiles the peak memory usage of the model to determine how
# much memory can be allocated for kv cache.
available_gpu_memory = self.model_executor.determine_available_memory()
self.available_gpu_memory_for_kv_cache = available_gpu_memory[0]
else:
# Attention free models don't need memory for kv cache
available_gpu_memory = [0] * len(kv_cache_specs)
assert len(kv_cache_specs) == len(available_gpu_memory)
# Track max_model_len before KV cache config to detect auto-fit changes
max_model_len_before = vllm_config.model_config.max_model_len
kv_cache_configs = get_kv_cache_configs(
vllm_config, kv_cache_specs, available_gpu_memory
)
# If auto-fit reduced max_model_len, sync the new value to workers.
# This is needed because workers were spawned before memory profiling
# and have the original (larger) max_model_len cached.
max_model_len_after = vllm_config.model_config.max_model_len
if max_model_len_after != max_model_len_before:
self.collective_rpc("update_max_model_len", args=(max_model_len_after,))
scheduler_kv_cache_config = generate_scheduler_kv_cache_config(kv_cache_configs)
vllm_config.cache_config.num_gpu_blocks = scheduler_kv_cache_config.num_blocks
kv_cache_groups = scheduler_kv_cache_config.kv_cache_groups
if kv_cache_groups:
vllm_config.cache_config.block_size = min(
g.kv_cache_spec.block_size for g in kv_cache_groups
)
vllm_config.validate_block_size()
# Initialize kv cache and warmup the execution
self.model_executor.initialize_from_config(kv_cache_configs)
elapsed = time.time() - start
logger.info_once(
"init engine (profile, create kv cache, warmup model) took %.2f seconds",
elapsed,
scope="local",
)
return scheduler_kv_cache_config
EngineCoreProc.run_busy_loop = run_busy_loop
EngineCoreProc._process_input_queue = _process_input_queue
EngineCore._initialize_kv_caches = _initialize_kv_caches

View File

@@ -0,0 +1,52 @@
import time
from vllm.v1.executor.abstract import logger, Executor
def is_offloaded(self) -> bool:
if not hasattr(self, "_is_offloaded"):
self._is_offloaded = False
return self._is_offloaded
def offload_vram(self):
if self.is_offloaded():
logger.warning("Executor is already offloaded.")
return
time_before_offload = time.perf_counter()
self.collective_rpc("offload_vram")
time_after_offload = time.perf_counter()
self._is_offloaded = True
logger.info(f"Offloading VRAM costs {time_after_offload - time_before_offload:.6f} seconds.")
def reload_vram(self) -> bool:
if not self.is_offloaded():
logger.warning("Executor is not offloaded.")
return True
while True:
time_before_reload = time.perf_counter()
res = self.collective_rpc("try_reload_vram")
time_after_reload = time.perf_counter()
succ = all(x[0] for x in res)
if succ:
self._is_offloaded = False
logger.info(f"Reloading VRAM costs {time_after_reload - time_before_reload:.6f} seconds.")
prev_is_self = all(x[1] for x in res)
return prev_is_self
else:
# some workers not get lock
self.collective_rpc("vnpu_unlock_gpu")
time.sleep(0.001)
def determine_available_memory_vnpu_offload_mode(self) -> int:
return self.collective_rpc("determine_available_memory_vnpu_offload_mode")
Executor.is_offloaded = is_offloaded
Executor.offload_vram = offload_vram
Executor.reload_vram = reload_vram
Executor.determine_available_memory_vnpu_offload_mode = determine_available_memory_vnpu_offload_mode

View File

@@ -485,7 +485,11 @@ class NPUPlatform(Platform):
# Find more details at https://docs.vllm.ai/projects/ascend/en/latest/faqs.html#how-to-handle-the-out-of-memory-issue
# NOTE: We should not set this environment variable in RL (sleep mode) scenarios.
# Find more details about how to configure this environment variable at https://www.hiascend.com/document/detail/zh/Pytorch/720/comref/Envvariables/Envir_012.html
if model_config and not model_config.enable_sleep_mode:
if (
model_config
and not model_config.enable_sleep_mode
and not envs_ascend.VLLM_ASCEND_ENABLE_VNPU
):
npu_alloc_configs = os.getenv("PYTORCH_NPU_ALLOC_CONF", "expandable_segments:True")
# This environment variable may have more than one key-value pairs.
# We should append ",expandable_segments:True" to the current configs.

View File

@@ -265,7 +265,10 @@ class NPUWorker(WorkerBase):
# take current memory snapshot
self.init_snapshot = MemorySnapshot()
self.requested_memory = self.init_snapshot.total_memory * self.cache_config.gpu_memory_utilization
if self.init_snapshot.free_memory < self.requested_memory:
if (
self.init_snapshot.free_memory < self.requested_memory
and not envs_ascend.VLLM_ASCEND_ENABLE_VNPU
):
GiB = lambda b: round(b / GiB_bytes, 2)
raise ValueError(
f"Free memory on device "
@@ -360,6 +363,28 @@ class NPUWorker(WorkerBase):
return int(self.available_kv_cache_memory_bytes)
@torch.inference_mode()
def determine_available_memory_vnpu_offload_mode(self) -> int:
GiB = lambda b: b / GiB_bytes
allocator = CaMemAllocator.get_instance()
free, total = allocator.get_pool_mem_info()
if self.cache_config.gpu_memory_utilization <= 0.9:
logger.warning(
"GPU memory utilization is set to %.2f. For VNPU mode, it is recommended to set gpu_memory_utilization to a larger value",
self.cache_config.gpu_memory_utilization,
)
available_kv_cache_memory = int(
total * self.cache_config.gpu_memory_utilization - (total - free)
)
available_kv_cache_memory = int(max(available_kv_cache_memory, 0))
self.available_kv_cache_memory_bytes = available_kv_cache_memory
logger.info_once(
"Available KV cache memory: %.2f GiB",
GiB(self.available_kv_cache_memory_bytes),
scope="local",
)
return int(self.available_kv_cache_memory_bytes)
def execute_model(
self,
scheduler_output: "SchedulerOutput",
@@ -431,6 +456,12 @@ class NPUWorker(WorkerBase):
allocator = CaMemAllocator.get_instance()
assert allocator.get_current_usage() == 0, "Sleep mode can only be used for one instance per process."
context = allocator.use_memory_pool(tag="weights")
elif envs_ascend.VLLM_ASCEND_ENABLE_VNPU:
allocator = CaMemAllocator.get_instance()
assert (
allocator.get_current_usage() == 0
), "vNPU mode can only be used for one instance per process."
context = allocator.use_memory_pool(tag="weights")
else:
from contextlib import nullcontext
@@ -438,6 +469,23 @@ class NPUWorker(WorkerBase):
with context, set_current_vllm_config(self.vllm_config):
self.model_runner.load_model()
if envs_ascend.VLLM_ASCEND_ENABLE_VNPU:
# save memory to host with lock
self.offload_vram()
succ, _ = self.try_reload_vram()
assert succ, "Failed to reload model weights after offloading."
def offload_vram(self) -> None:
allocator = CaMemAllocator.get_instance()
allocator.offload_vram(offload_tags=("weights",))
def try_reload_vram(self) -> tuple[bool, bool]:
allocator = CaMemAllocator.get_instance()
return allocator.try_reload_vram(tags=None)
def vnpu_unlock_gpu(self) -> None:
allocator = CaMemAllocator.get_instance()
allocator.vnpu_unlock_gpu()
def compile_or_warm_up_model(self) -> float:
# Note: need to adapt for graph mode.
@@ -517,6 +565,9 @@ class NPUWorker(WorkerBase):
if self.vllm_config.model_config.enable_sleep_mode:
allocator = CaMemAllocator.get_instance()
context = allocator.use_memory_pool(tag="kv_cache")
elif envs_ascend.VLLM_ASCEND_ENABLE_VNPU:
allocator = CaMemAllocator.get_instance()
context = allocator.use_memory_pool(tag="kv_cache")
else:
from contextlib import nullcontext