vllm-ascend vnpu v1

This commit is contained in:
starkwj
2025-12-26 07:37:35 +00:00
parent 2f1aed98cc
commit 135cc0a505
168 changed files with 28337 additions and 9 deletions

View File

@@ -20,10 +20,12 @@ import dataclasses
import os
from contextlib import contextmanager
from typing import Any, Callable, Dict, Optional, Tuple, Union
import time
import torch
from acl.rt import memcpy # type: ignore # noqa: F401
from acl.rt import memcpy, memset # type: ignore # noqa: F401
from vllm.logger import logger
import vllm_ascend.envs as envs_ascend
from vllm_ascend.platform import NPUPlatform
@@ -56,8 +58,20 @@ def find_loaded_library(lib_name) -> Optional[str]:
camem_available = False
try:
from vllm_ascend.vllm_ascend_C import ( # type: ignore # noqa: F401
init_module, python_create_and_map, python_unmap_and_release)
if envs_ascend.VLLM_ASCEND_ENABLE_IDLE_OFFLOAD:
from vllm_ascend.vllm_ascend_C import ( # type: ignore # noqa: F401
init_module_offload as init_module,
python_create_and_map_offload as python_create_and_map,python_unmap_and_release_offload as python_unmap_and_release,
python_get_mem_info_offload as python_get_mem_info,
python_lock_gpu_offload as python_lock_gpu,
python_unlock_gpu_offload as python_unlock_gpu
)
else:
from vllm_ascend.vllm_ascend_C import ( # type: ignore # noqa: F401
init_module, python_create_and_map, python_unmap_and_release)
python_get_mem_info = None
python_lock_gpu = None
python_unlock_gpu = None
lib_name = find_loaded_library("vllm_ascend_C")
camem_available = True
except ImportError as e:
@@ -66,6 +80,9 @@ except ImportError as e:
init_module = None
python_create_and_map = None
python_unmap_and_release = None
python_get_mem_info = None
python_lock_gpu = None
python_unlock_gpu = None
lib_name = None
libcudart = None
@@ -93,8 +110,14 @@ def get_pluggable_allocator(
python_free_func: Callable[[int], tuple[int, int, int, int]]
) -> torch.npu.memory.NPUPluggableAllocator:
init_module(python_malloc_fn, python_free_func)
new_alloc = torch.npu.memory.NPUPluggableAllocator(lib_name, 'my_malloc',
'my_free')
if envs_ascend.VLLM_ASCEND_ENABLE_IDLE_OFFLOAD:
new_alloc = torch.npu.memory.NPUPluggableAllocator(
lib_name, 'my_malloc_offload', 'my_free_offload'
)
else:
new_alloc = torch.npu.memory.NPUPluggableAllocator(
lib_name, 'my_malloc', 'my_free'
)
return new_alloc
@@ -153,6 +176,7 @@ class CaMemAllocator:
self.pointer_to_data: Dict[int, AllocationData] = {}
self.current_tag: str = CaMemAllocator.default_tag
self.allocator_and_pools: Dict[str, Any] = {}
# self.requested_vram_size = 0
def python_malloc_callback(self, allocation_handle: HandleType) -> None:
"""
@@ -254,6 +278,9 @@ class CaMemAllocator:
# to avoid the issue, we keep a reference of the data.
# see https://github.com/pytorch/pytorch/issues/146431 .
self.allocator_and_pools[tag] = data
# lock gpu
if envs_ascend.VLLM_ASCEND_ENABLE_IDLE_OFFLOAD:
self.vnpu_lock_gpu()
yield
# PyTorch's bug, calling torch.cuda.empty_cache() will error
# when using pluggable allocator, see
@@ -265,6 +292,8 @@ class CaMemAllocator:
# allocate memory.
# TODO: we need to find a way to release the memory,
# i.e. calling torch.cuda.empty_cache()
if envs_ascend.VLLM_ASCEND_ENABLE_IDLE_OFFLOAD:
self.vnpu_unlock_gpu()
self.current_tag = old_tag
def get_current_usage(self) -> int:
@@ -276,3 +305,100 @@ class CaMemAllocator:
handle = data.handle
sum_bytes += handle[1]
return sum_bytes
def vnpu_lock_gpu(self) -> bool:
if python_lock_gpu:
return python_lock_gpu()
else:
return False
def vnpu_unlock_gpu(self):
if python_unlock_gpu:
python_unlock_gpu()
def get_pool_mem_info(self) -> int:
"""
get available memory in reserved pool."""
return python_get_mem_info()
def offload_vram(
self,
offload_tags: Optional[Union[Tuple[str, ...],
str]] = None) -> None:
"""
Put the allocator in sleep mode.
All data in the memory allocation with the specified tag will be
offloaded to CPU memory, and others will be discarded.
:param offload_tags: The tags of the memory allocation that will be
offloaded. The rest of the memory allocation will be discarded.
"""
if offload_tags is None:
# by default, allocated tensors are offloaded
# when the allocator sleeps
offload_tags = (CaMemAllocator.default_tag, )
elif isinstance(offload_tags, str):
offload_tags = (offload_tags, )
assert isinstance(offload_tags, tuple)
sz_weights = 0
sz_kvcache = 0
for ptr, data in self.pointer_to_data.items():
handle = data.handle
if data.tag in offload_tags:
size_in_bytes = handle[1]
if data.cpu_backup_tensor is None:
cpu_backup_tensor = torch.empty(
size_in_bytes,
dtype=torch.uint8,
device='cpu',
pin_memory=NPUPlatform.is_pin_memory_available())
cpu_ptr = cpu_backup_tensor.data_ptr()
ACL_MEMCPY_DEVICE_TO_HOST = 2
dest_max = cpu_ptr + size_in_bytes * 2
memcpy(cpu_ptr, dest_max, ptr, size_in_bytes,
ACL_MEMCPY_DEVICE_TO_HOST)
data.cpu_backup_tensor = cpu_backup_tensor
unmap_and_release(handle)
sz_weights += size_in_bytes
else:
size_in_bytes = handle[1]
unmap_and_release(handle)
sz_kvcache += size_in_bytes
# self.requested_vram_size = sz_weights + sz_kvcache
self.vnpu_unlock_gpu()
# logger.info(f"offload: tags {offload_tags}: {sz_weights/(1024**3):.2f} GB, discard kv cache: {sz_kvcache/(1024**3):.2f} GB")
def reload_vram(self, tags: Optional[list[str]] = None) -> bool:
"""
Wake up the allocator from sleep mode.
All data that is previously offloaded will be loaded back to GPU
memory, and the rest of the data will have empty memory."""
prev_is_self = self.vnpu_lock_gpu()
if prev_is_self:
# nothing to do
return True
for ptr, data in self.pointer_to_data.items():
handle = data.handle
if tags is None or data.tag in tags:
create_and_map(handle)
if data.cpu_backup_tensor is not None:
cpu_backup_tensor = data.cpu_backup_tensor
size_in_bytes = cpu_backup_tensor.numel(
) * cpu_backup_tensor.element_size()
cpu_ptr = cpu_backup_tensor.data_ptr()
ACL_MEMCPY_HOST_TO_DEVICE = 1
dest_max = ptr + size_in_bytes * 2
memcpy(ptr, dest_max, cpu_ptr, size_in_bytes,
ACL_MEMCPY_HOST_TO_DEVICE)
# data.cpu_backup_tensor = None
# TO check: no need to re-memset if we reset_prefix_cache
# else:
# size_in_bytes = handle[1]
# memset(ptr, size_in_bytes, 0, size_in_bytes)
return False

View File

@@ -166,7 +166,8 @@ env_variables: Dict[str, Callable[[], Any]] = {
lambda: bool(int(os.getenv("VLLM_ASCEND_ENABLE_MLAPO", '0'))),
# Whether to enable transpose weight and cast format to FRACTAL_NZ.
"VLLM_ASCEND_ENABLE_NZ":
lambda: int(os.getenv("VLLM_ASCEND_ENABLE_NZ", 1)),
lambda: int(os.getenv("VLLM_ASCEND_ENABLE_NZ", 0)),
"VLLM_ASCEND_ENABLE_IDLE_OFFLOAD": lambda: int(os.getenv("VLLM_ASCEND_ENABLE_IDLE_OFFLOAD", 1)),
}
# end-env-vars-definition

View File

@@ -28,3 +28,4 @@ if os.getenv("DYNAMIC_EPLB", "false") == "true" or os.getenv(
if os.getenv("SHM_BARRIER", "true") == "true":
import vllm_ascend.patch.platform.patch_core # noqa
import vllm_ascend.patch.platform.patch_message_queue # noqa
import vllm_ascend.patch.platform.patch_executor # noqa

View File

@@ -1,12 +1,19 @@
import signal
from typing import Optional
from logging import DEBUG
import time
from vllm.config import ParallelConfig
from vllm.config import ParallelConfig, VllmConfig
from vllm.logger import logger
from vllm.transformers_utils.config import \
maybe_register_config_serialize_by_value
from vllm.utils import decorate_logs, set_process_title
from vllm.v1.engine.core import DPEngineCoreProc, EngineCoreProc
from vllm.v1.kv_cache_interface import KVCacheConfig
from vllm.v1.core.kv_cache_utils import (generate_scheduler_kv_cache_config,
get_kv_cache_configs)
from vllm.v1.engine.core import DPEngineCoreProc, EngineCoreProc, EngineCore
import vllm_ascend.envs as envs_ascend
def run_engine_core(*args, dp_rank: int = 0, local_dp_rank: int = 0, **kwargs):
@@ -66,3 +73,101 @@ def run_engine_core(*args, dp_rank: int = 0, local_dp_rank: int = 0, **kwargs):
EngineCoreProc.run_engine_core = run_engine_core
def run_busy_loop(self):
"""Core busy loop of the EngineCore."""
# Loop until process is sent a SIGINT or SIGTERM
while True:
# 1) Poll the input queue until there is work to do.
self._process_input_queue()
# 2) Step the engine core and return the outputs.
if envs_ascend.VLLM_ASCEND_ENABLE_IDLE_OFFLOAD and self.scheduler.has_requests() and self.model_executor.is_offloaded:
prev_is_self = self.model_executor.reload_vram()
if not prev_is_self:
self.reset_prefix_cache()
self._process_engine_step()
if envs_ascend.VLLM_ASCEND_ENABLE_IDLE_OFFLOAD and not self.scheduler.has_requests() and not self.model_executor.is_offloaded:
self.model_executor.offload_vram()
def _process_input_queue(self):
"""Exits when an engine step needs to be performed."""
waited = False
while not self.engines_running and not self.scheduler.has_requests() \
and not self.batch_queue:
if logger.isEnabledFor(DEBUG) and self.input_queue.empty():
logger.debug("EngineCore waiting for work.")
waited = True
if envs_ascend.VLLM_ASCEND_ENABLE_IDLE_OFFLOAD and not self.model_executor.is_offloaded:
self.model_executor.offload_vram()
req = self.input_queue.get()
self._handle_client_request(*req)
if waited:
logger.debug("EngineCore loop active.")
# Handle any more client requests.
while not self.input_queue.empty():
req = self.input_queue.get_nowait()
self._handle_client_request(*req)
EngineCoreProc.run_busy_loop = run_busy_loop
EngineCoreProc._process_input_queue = _process_input_queue
def _initialize_kv_caches(
self, vllm_config: VllmConfig) -> tuple[int, int, KVCacheConfig]:
start = time.time()
# Get all kv cache needed by the model
kv_cache_specs = self.model_executor.get_kv_cache_specs()
has_kv_cache = any(kv_cache_spec for kv_cache_spec in kv_cache_specs)
if has_kv_cache:
if envs_ascend.VLLM_ASCEND_ENABLE_IDLE_OFFLOAD:
# get available memory in idle offload mode
available_gpu_memory = (
self.model_executor.determine_available_memory_idle_offload_mode())
self.available_gpu_memory_for_kv_cache = \
available_gpu_memory[0]
elif os.environ.get("VLLM_ELASTIC_EP_SCALE_UP_LAUNCH") == "1":
dp_group = getattr(self, "dp_group", None)
assert dp_group is not None
self.available_gpu_memory_for_kv_cache = \
ParallelConfig.sync_kv_cache_memory_size(dp_group, -1)
available_gpu_memory = [
self.available_gpu_memory_for_kv_cache
] * len(kv_cache_specs)
else:
# Profiles the peak memory usage of the model to determine how
# much memory can be allocated for kv cache.
available_gpu_memory = (
self.model_executor.determine_available_memory())
self.available_gpu_memory_for_kv_cache = \
available_gpu_memory[0]
else:
# Attention free models don't need memory for kv cache
available_gpu_memory = [0] * len(kv_cache_specs)
assert len(kv_cache_specs) == len(available_gpu_memory)
kv_cache_configs = get_kv_cache_configs(vllm_config, kv_cache_specs,
available_gpu_memory)
scheduler_kv_cache_config = generate_scheduler_kv_cache_config(
kv_cache_configs)
num_gpu_blocks = scheduler_kv_cache_config.num_blocks
num_cpu_blocks = 0
# Initialize kv cache and warmup the execution
self.model_executor.initialize_from_config(kv_cache_configs)
elapsed = time.time() - start
logger.info(("init engine (profile, create kv cache, "
"warmup model) took %.2f seconds"), elapsed)
return num_gpu_blocks, num_cpu_blocks, scheduler_kv_cache_config
EngineCore._initialize_kv_caches = _initialize_kv_caches

View File

@@ -0,0 +1,44 @@
import time
from vllm.executor.executor_base import logger, ExecutorBase
original_init = ExecutorBase.__init__
def init(self, *args, **kwargs):
original_init(self, *args, **kwargs)
self.is_offloaded = False
def offload_vram(self):
if self.is_offloaded:
logger.warning("Executor is already offloaded.")
return
time_before_offload = time.perf_counter()
self.collective_rpc("offload_vram")
time_after_offload = time.perf_counter()
self.is_offloaded = True
logger.info(f"Offloading VRAM costs {time_after_offload - time_before_offload:.6f} seconds.")
def reload_vram(self) -> bool:
if not self.is_offloaded:
logger.warning("Executor is not offloaded.")
return True
time_before_reload = time.perf_counter()
prev_is_self = self.collective_rpc("reload_vram")
time_after_reload = time.perf_counter()
self.is_offloaded = False
logger.info(f"Reloading VRAM costs {time_after_reload - time_before_reload:.6f} seconds.")
return prev_is_self
def determine_available_memory_idle_offload_mode(self) -> int:
return self.collective_rpc("determine_available_memory_idle_offload_mode")
ExecutorBase.__init__ = init
ExecutorBase.offload_vram = offload_vram
ExecutorBase.reload_vram = reload_vram
ExecutorBase.determine_available_memory_idle_offload_mode = determine_available_memory_idle_offload_mode

View File

@@ -258,6 +258,17 @@ class NPUWorker(WorkerBase):
)
return available_kv_cache_memory
def determine_available_memory_idle_offload_mode(self) -> int:
allocator = CaMemAllocator.get_instance()
free, total = allocator.get_pool_mem_info()
available_kv_cache_memory = int(
total * self.cache_config.gpu_memory_utilization - (total - free))
available_kv_cache_memory = int(max(available_kv_cache_memory, 0))
logger.info(
f"Available memory (idle offload mode): {available_kv_cache_memory}, total memory: {total}"
)
return available_kv_cache_memory
def execute_model(
self,
scheduler_output: "SchedulerOutput",
@@ -306,11 +317,46 @@ class NPUWorker(WorkerBase):
"Sleep mode can only be "
"used for one instance per process.")
context = allocator.use_memory_pool(tag="weights")
elif envs_ascend.VLLM_ASCEND_ENABLE_IDLE_OFFLOAD:
if not sleep_mode_enabled():
raise ValueError(
"Sleep mode is not enabled. Please compile vllm-ascend with COMPILE_CUSTOM_KERNELS=1."
)
if is_enable_nz():
raise ValueError(
"FRACTAL_NZ mode is enabled. This may cause model parameter precision issues "
"in the RL scenarios. Please set VLLM_ASCEND_ENABLE_NZ=0.")
allocator = CaMemAllocator.get_instance()
assert allocator.get_current_usage() == 0, (
"Idle offload mode can only be "
"used for one instance per process.")
context = allocator.use_memory_pool(tag="weights")
else:
from contextlib import nullcontext
context = nullcontext() # type: ignore
with context:
self.model_runner.load_model()
if envs_ascend.VLLM_ASCEND_ENABLE_IDLE_OFFLOAD:
# save memory to host with lock
self.offload_vram()
self.reload_vram()
def offload_vram(self) -> None:
# free_bytes_before_offload = NPUPlatform.mem_get_info()[0]
allocator = CaMemAllocator.get_instance()
allocator.offload_vram(offload_tags=("weights", ))
# free_bytes_after_offload, total = NPUPlatform.mem_get_info()
# freed_bytes = free_bytes_after_offload - free_bytes_before_offload
# used_bytes = total - free_bytes_after_offload
# assert freed_bytes >= 0, "Memory usage increased after sleeping."
# logger.info(
# "Offloading freed %.2f GiB memory, "
# "%.2f GiB memory is still in use.", freed_bytes / GiB_bytes,
# used_bytes / GiB_bytes)
def reload_vram(self) -> bool:
allocator = CaMemAllocator.get_instance()
return allocator.reload_vram(tags=None)
def compile_or_warm_up_model(self) -> None:
# Note: need to adapt for graph mode.
@@ -351,6 +397,9 @@ class NPUWorker(WorkerBase):
if self.vllm_config.model_config.enable_sleep_mode:
allocator = CaMemAllocator.get_instance()
context = allocator.use_memory_pool(tag="kv_cache")
elif envs_ascend.VLLM_ASCEND_ENABLE_IDLE_OFFLOAD:
allocator = CaMemAllocator.get_instance()
context = allocator.use_memory_pool(tag="kv_cache")
else:
from contextlib import nullcontext
context = nullcontext() # type: ignore