From 7265dc090d0ef0d313d7a3ea6ddf0bc64cb2de07 Mon Sep 17 00:00:00 2001 From: wangxiyuan Date: Mon, 21 Jul 2025 19:43:30 +0800 Subject: [PATCH] [2/4][Refactor] Refactor torchair utils (#1892) There is a lot torchair specified logic in common code. It results hard code maintenance. We will create a new torchair module to launch torchair related logic there. I plan to add 4 PR. 1. Refactor worker 2. Refactor utils (this PR) - simple change that move all torchair related util function to torchair module 3. Refactor model_runner 4. Refactor attention - vLLM version: v0.9.2 - vLLM main: https://github.com/vllm-project/vllm/commit/8188196a1c8af26134d8e366ebe564c18fb95379 Signed-off-by: wangxiyuan --- tests/ut/__init__.py | 0 tests/ut/test_utils.py | 21 ----- tests/ut/torchair/__init__.py | 0 tests/ut/torchair/test_utils.py | 28 ++++++ vllm_ascend/attention/mla_v1.py | 4 +- vllm_ascend/ops/fused_moe.py | 4 +- vllm_ascend/quantization/w8a8_dynamic.py | 4 +- vllm_ascend/torchair/torchair_worker.py | 8 +- vllm_ascend/torchair/utils.py | 98 +++++++++++++++++++++ vllm_ascend/utils.py | 104 +---------------------- vllm_ascend/worker/model_runner_v1.py | 7 +- 11 files changed, 142 insertions(+), 136 deletions(-) create mode 100644 tests/ut/__init__.py create mode 100644 tests/ut/torchair/__init__.py create mode 100644 tests/ut/torchair/test_utils.py create mode 100644 vllm_ascend/torchair/utils.py diff --git a/tests/ut/__init__.py b/tests/ut/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/ut/test_utils.py b/tests/ut/test_utils.py index 0bfe26b..6eb96df 100644 --- a/tests/ut/test_utils.py +++ b/tests/ut/test_utils.py @@ -290,27 +290,6 @@ class TestUtils(TestBase): 3, len(test_vllm_config.compilation_config.cudagraph_capture_sizes)) - def test_get_torchair_current_work_dir(self): - cache_dir = utils.TORCHAIR_CACHE_DIR - work_dir = utils.get_torchair_current_work_dir() - self.assertEqual(cache_dir, work_dir) - work_dir = utils.get_torchair_current_work_dir("test") - self.assertEqual(os.path.join(cache_dir, "test"), work_dir) - - def test_torchair_cache_dir(self): - utils.write_kv_cache_bytes_to_file(0, 100) - self.assertTrue(utils.check_torchair_cache_exist(), - "Create torchair cache dir failed") - self.assertTrue(utils.check_kv_cache_bytes_cache_exist(), - "Create kv cache bytes cache dir failed") - kv_cache_bytes = utils.read_kv_cache_bytes_from_file(0) - self.assertEqual(100, kv_cache_bytes) - utils.delete_torchair_cache_file() - self.assertFalse(utils.check_torchair_cache_exist(), - "Delete torchair cache dir failed") - self.assertFalse(utils.check_kv_cache_bytes_cache_exist(), - "Delete kv cache bytes cache dir failed") - @mock.patch("vllm.model_executor.custom_op.CustomOp") @mock.patch("vllm_ascend.ops.activation.AscendQuickGELU") @mock.patch("vllm_ascend.ops.activation.AscendSiluAndMul") diff --git a/tests/ut/torchair/__init__.py b/tests/ut/torchair/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/ut/torchair/test_utils.py b/tests/ut/torchair/test_utils.py new file mode 100644 index 0000000..b2b3c65 --- /dev/null +++ b/tests/ut/torchair/test_utils.py @@ -0,0 +1,28 @@ +import os + +from tests.ut.base import TestBase +from vllm_ascend.torchair import utils + + +class TestTorchairUtils(TestBase): + + def test_get_torchair_current_work_dir(self): + cache_dir = utils.TORCHAIR_CACHE_DIR + work_dir = utils._get_torchair_current_work_dir() + self.assertEqual(cache_dir, work_dir) + work_dir = utils._get_torchair_current_work_dir("test") + self.assertEqual(os.path.join(cache_dir, "test"), work_dir) + + def test_torchair_cache_dir(self): + utils.write_kv_cache_bytes_to_file(0, 100) + self.assertTrue(utils.check_torchair_cache_exist(), + "Create torchair cache dir failed") + self.assertTrue(utils.check_kv_cache_bytes_cache_exist(), + "Create kv cache bytes cache dir failed") + kv_cache_bytes = utils.read_kv_cache_bytes_from_file(0) + self.assertEqual(100, kv_cache_bytes) + utils.delete_torchair_cache_file() + self.assertFalse(utils.check_torchair_cache_exist(), + "Delete torchair cache dir failed") + self.assertFalse(utils.check_kv_cache_bytes_cache_exist(), + "Delete kv cache bytes cache dir failed") diff --git a/vllm_ascend/attention/mla_v1.py b/vllm_ascend/attention/mla_v1.py index 37e9454..3accb32 100644 --- a/vllm_ascend/attention/mla_v1.py +++ b/vllm_ascend/attention/mla_v1.py @@ -21,8 +21,8 @@ from vllm_ascend.multistream.base import MSAttentionMetadataSplitConfig from vllm_ascend.multistream.context import get_multistream_comm_context from vllm_ascend.multistream.ms_split import model_input_split_v1_mla_attn from vllm_ascend.ops.attention import vanilla_chunked_prefill_mla -from vllm_ascend.utils import (npu_prefetch, npu_stream_switch, - npu_wait_tensor, vllm_version_is) +from vllm_ascend.torchair.utils import npu_stream_switch, npu_wait_tensor +from vllm_ascend.utils import npu_prefetch, vllm_version_is from vllm_ascend.worker.npu_input_batch import InputBatch if TYPE_CHECKING: diff --git a/vllm_ascend/ops/fused_moe.py b/vllm_ascend/ops/fused_moe.py index 1111f22..8697477 100644 --- a/vllm_ascend/ops/fused_moe.py +++ b/vllm_ascend/ops/fused_moe.py @@ -43,10 +43,10 @@ from vllm_ascend.ascend_config import get_ascend_config from vllm_ascend.distributed.communication_op import \ data_parallel_reduce_scatter from vllm_ascend.ops.expert_load_balancer import ExpertLoadBalancer +from vllm_ascend.torchair.utils import npu_stream_switch, npu_wait_tensor from vllm_ascend.utils import (FusedMoEState, dispose_tensor, get_all_reduce_merge_state, get_fused_moe_state, - get_rm_router_logits_state, is_310p, - npu_stream_switch, npu_wait_tensor) + get_rm_router_logits_state, is_310p) MOE_ALL2ALL_BUFFER: bool = envs_ascend.MOE_ALL2ALL_BUFFER SELECT_GATING_TOPK_SOTFMAX_EXPERTS: bool = envs_ascend.SELECT_GATING_TOPK_SOTFMAX_EXPERTS diff --git a/vllm_ascend/quantization/w8a8_dynamic.py b/vllm_ascend/quantization/w8a8_dynamic.py index 0093578..0d2e102 100644 --- a/vllm_ascend/quantization/w8a8_dynamic.py +++ b/vllm_ascend/quantization/w8a8_dynamic.py @@ -26,9 +26,9 @@ from vllm.distributed.parallel_state import get_ep_group import vllm_ascend.envs as envs from vllm_ascend.ascend_config import get_ascend_config from vllm_ascend.ops.fused_moe import select_experts +from vllm_ascend.torchair.utils import npu_stream_switch, npu_wait_tensor from vllm_ascend.utils import (ACL_FORMAT_FRACTAL_NZ, FusedMoEState, - dispose_tensor, get_fused_moe_state, - npu_stream_switch, npu_wait_tensor) + dispose_tensor, get_fused_moe_state) def apply_mlp(hidden_states: torch.Tensor, diff --git a/vllm_ascend/torchair/torchair_worker.py b/vllm_ascend/torchair/torchair_worker.py index dd426e3..f5e2f21 100644 --- a/vllm_ascend/torchair/torchair_worker.py +++ b/vllm_ascend/torchair/torchair_worker.py @@ -17,10 +17,10 @@ import torch from vllm.logger import logger import vllm_ascend.envs as envs_ascend -from vllm_ascend.utils import (check_kv_cache_bytes_cache_exist, - check_torchair_cache_exist, - delete_torchair_cache_file, - read_kv_cache_bytes_from_file) +from vllm_ascend.torchair.utils import (check_kv_cache_bytes_cache_exist, + check_torchair_cache_exist, + delete_torchair_cache_file, + read_kv_cache_bytes_from_file) from vllm_ascend.worker.worker_v1 import NPUWorker diff --git a/vllm_ascend/torchair/utils.py b/vllm_ascend/torchair/utils.py new file mode 100644 index 0000000..f1a6138 --- /dev/null +++ b/vllm_ascend/torchair/utils.py @@ -0,0 +1,98 @@ +import fcntl +import os +import shutil +from contextlib import contextmanager, nullcontext + +import torch + +try: + # Recent release of torchair has moved these ops to `.scope`. + from torchair.scope import npu_stream_switch as _npu_stream_switch + from torchair.scope import npu_wait_tensor as _npu_wait_tensor +except ImportError: + from torchair.ops import NpuStreamSwitch as _npu_stream_switch + from torchair.ops import npu_wait_tensor as _npu_wait_tensor + +KV_CACHE_BYTES_CACHE_PATH_NAME = ".kv_cache_bytes" +KV_CACHE_BYTES_CACHE_FILE_NAME = "kv_cache_bytes" +TORCHAIR_CACHE_PATH_NAME = ".torchair_cache" +TORCHAIR_CACHE_DIR = os.getenv( + 'TORCHAIR_CACHE_HOME', os.path.join(os.getcwd(), TORCHAIR_CACHE_PATH_NAME)) + + +@contextmanager +def _file_lock(file_descriptor, lock_type): + fcntl.flock(file_descriptor, lock_type) + try: + yield + finally: + fcntl.flock(file_descriptor, fcntl.LOCK_UN) + + +def _get_torchair_current_work_dir(file_name=None): + if file_name is None: + return TORCHAIR_CACHE_DIR + return os.path.join(TORCHAIR_CACHE_DIR, file_name) + + +def check_torchair_cache_exist(): + res = False + torch_air_abs_path = _get_torchair_current_work_dir() + if os.path.exists(torch_air_abs_path): + file_list = os.listdir(torch_air_abs_path) + if len(file_list) != 0: + res = True + return res + + +def check_kv_cache_bytes_cache_exist(): + res = False + kv_cache_bytes_cache_abs_path = _get_torchair_current_work_dir( + KV_CACHE_BYTES_CACHE_PATH_NAME) + if os.path.exists(kv_cache_bytes_cache_abs_path): + file_list = os.listdir(kv_cache_bytes_cache_abs_path) + if len(file_list) != 0: + res = True + return res + + +def read_kv_cache_bytes_from_file(rank) -> int: + kv_cache_bytes = -1 + kv_cache_bytes_cache_abs_path = _get_torchair_current_work_dir( + KV_CACHE_BYTES_CACHE_PATH_NAME) + kv_cache_bytes_file = os.path.join( + kv_cache_bytes_cache_abs_path, + f"{rank}_{KV_CACHE_BYTES_CACHE_FILE_NAME}") + with open(kv_cache_bytes_file, "r", encoding="utf-8") as f: + with _file_lock(f, fcntl.LOCK_SH): + kv_cache_bytes = int(f.readline()) + return kv_cache_bytes + + +def write_kv_cache_bytes_to_file(rank, kv_cache_bytes): + kv_cache_bytes_cache_abs_path = _get_torchair_current_work_dir( + KV_CACHE_BYTES_CACHE_PATH_NAME) + os.makedirs(kv_cache_bytes_cache_abs_path, exist_ok=True) + kv_cache_bytes_file = os.path.join( + kv_cache_bytes_cache_abs_path, + f"{rank}_{KV_CACHE_BYTES_CACHE_FILE_NAME}") + with open(kv_cache_bytes_file, "w", encoding="utf-8") as f: + with _file_lock(f, fcntl.LOCK_EX): + f.write(f"{kv_cache_bytes}") + + +def delete_torchair_cache_file(): + torch_air_abs_path = _get_torchair_current_work_dir() + if os.path.exists(torch_air_abs_path): + shutil.rmtree(torch_air_abs_path) + + +def npu_stream_switch(tag: str, priority: int, *, enabled: bool = True): + return _npu_stream_switch(tag, priority) if enabled else nullcontext() + + +def npu_wait_tensor(self: torch.Tensor, + dependency: torch.Tensor, + *, + enabled: bool = True): + return _npu_wait_tensor(self, dependency) if enabled else self diff --git a/vllm_ascend/utils.py b/vllm_ascend/utils.py index 6b6af44..f859b84 100644 --- a/vllm_ascend/utils.py +++ b/vllm_ascend/utils.py @@ -18,12 +18,9 @@ # import atexit -import fcntl import functools import math -import os -import shutil -from contextlib import contextmanager, nullcontext +from contextlib import contextmanager from enum import Enum from threading import Lock from typing import TYPE_CHECKING, List, Tuple @@ -37,14 +34,6 @@ from vllm.logger import logger import vllm_ascend.envs as envs from vllm_ascend.ascend_config import get_ascend_config -try: - # Recent release of torchair has moved these ops to `.scope`. - from torchair.scope import npu_stream_switch as _npu_stream_switch - from torchair.scope import npu_wait_tensor as _npu_wait_tensor -except ImportError: - from torchair.ops import NpuStreamSwitch as _npu_stream_switch - from torchair.ops import npu_wait_tensor as _npu_wait_tensor - if TYPE_CHECKING: from vllm.config import VllmConfig else: @@ -67,6 +56,7 @@ _CUSTOM_OP_ENABLED = None _IS_310P = None _SLEEP_MODE_ENABLED = None _CURRENT_STREAM = None +_ASCEND_CUSTOMOP_IS_REIGISTERED = False def is_310p(): @@ -403,19 +393,6 @@ class ProfileExecuteDuration: return durations -# TODO(wxy): Move to ops module -def npu_stream_switch(tag: str, priority: int, *, enabled: bool = True): - return _npu_stream_switch(tag, priority) if enabled else nullcontext() - - -# TODO(wxy): Move to ops module -def npu_wait_tensor(self: torch.Tensor, - dependency: torch.Tensor, - *, - enabled: bool = True): - return _npu_wait_tensor(self, dependency) if enabled else self - - # TODO(wxy): Move to ops module def npu_prefetch(input: torch.Tensor, dependency: torch.Tensor, @@ -489,83 +466,6 @@ def get_fused_moe_state(ep_size: int, with_prefill: bool, return FusedMoEState.MC2 -KV_CACHE_BYTES_CACHE_PATH_NAME = ".kv_cache_bytes" -KV_CACHE_BYTES_CACHE_FILE_NAME = "kv_cache_bytes" -TORCHAIR_CACHE_PATH_NAME = ".torchair_cache" -TORCHAIR_CACHE_DIR = os.getenv( - 'TORCHAIR_CACHE_HOME', os.path.join(os.getcwd(), TORCHAIR_CACHE_PATH_NAME)) - - -def get_torchair_current_work_dir(file_name=None): - if file_name is None: - return TORCHAIR_CACHE_DIR - return os.path.join(TORCHAIR_CACHE_DIR, file_name) - - -def check_torchair_cache_exist(): - res = False - torch_air_abs_path = get_torchair_current_work_dir() - if os.path.exists(torch_air_abs_path): - file_list = os.listdir(torch_air_abs_path) - if len(file_list) != 0: - res = True - return res - - -def check_kv_cache_bytes_cache_exist(): - res = False - kv_cache_bytes_cache_abs_path = get_torchair_current_work_dir( - KV_CACHE_BYTES_CACHE_PATH_NAME) - if os.path.exists(kv_cache_bytes_cache_abs_path): - file_list = os.listdir(kv_cache_bytes_cache_abs_path) - if len(file_list) != 0: - res = True - return res - - -def read_kv_cache_bytes_from_file(rank) -> int: - kv_cache_bytes = -1 - kv_cache_bytes_cache_abs_path = get_torchair_current_work_dir( - KV_CACHE_BYTES_CACHE_PATH_NAME) - kv_cache_bytes_file = os.path.join( - kv_cache_bytes_cache_abs_path, - f"{rank}_{KV_CACHE_BYTES_CACHE_FILE_NAME}") - with open(kv_cache_bytes_file, "r", encoding="utf-8") as f: - with file_lock(f, fcntl.LOCK_SH): - kv_cache_bytes = int(f.readline()) - return kv_cache_bytes - - -@contextmanager -def file_lock(file_descriptor, lock_type): - fcntl.flock(file_descriptor, lock_type) - try: - yield - finally: - fcntl.flock(file_descriptor, fcntl.LOCK_UN) - - -def write_kv_cache_bytes_to_file(rank, kv_cache_bytes): - kv_cache_bytes_cache_abs_path = get_torchair_current_work_dir( - KV_CACHE_BYTES_CACHE_PATH_NAME) - os.makedirs(kv_cache_bytes_cache_abs_path, exist_ok=True) - kv_cache_bytes_file = os.path.join( - kv_cache_bytes_cache_abs_path, - f"{rank}_{KV_CACHE_BYTES_CACHE_FILE_NAME}") - with open(kv_cache_bytes_file, "w", encoding="utf-8") as f: - with file_lock(f, fcntl.LOCK_EX): - f.write(f"{kv_cache_bytes}") - - -def delete_torchair_cache_file(): - torch_air_abs_path = get_torchair_current_work_dir() - if os.path.exists(torch_air_abs_path): - shutil.rmtree(torch_air_abs_path) - - -_ASCEND_CUSTOMOP_IS_REIGISTERED = False - - def register_ascend_customop(): """Register Ascend CustomOP diff --git a/vllm_ascend/worker/model_runner_v1.py b/vllm_ascend/worker/model_runner_v1.py index e027b7c..0ad4f36 100644 --- a/vllm_ascend/worker/model_runner_v1.py +++ b/vllm_ascend/worker/model_runner_v1.py @@ -79,11 +79,12 @@ from vllm_ascend.attention.mla_v1 import (AscendMLAMetadata, CommonAttentionMetadata) from vllm_ascend.platform import NPUPlatform from vllm_ascend.sample.rejection_sampler import AscendRejectionSampler +from vllm_ascend.torchair.utils import (check_torchair_cache_exist, + write_kv_cache_bytes_to_file) from vllm_ascend.utils import (ACL_FORMAT_FRACTAL_ND, ACL_FORMAT_FRACTAL_NZ, - ProfileExecuteDuration, - check_torchair_cache_exist, is_310p, + ProfileExecuteDuration, is_310p, maybe_converting_weight_acl_format, - vllm_version_is, write_kv_cache_bytes_to_file) + vllm_version_is) from vllm_ascend.worker.eagle_proposer_v1 import EagleProposer from vllm_ascend.worker.mtp_proposer_v1 import MtpProposer from vllm_ascend.worker.npu_input_batch import CachedRequestState, InputBatch