From 31f29b9f30eb65f0a38d30ffe613282ce2f1130a Mon Sep 17 00:00:00 2001 From: wangxiyuan Date: Fri, 28 Mar 2025 19:34:23 +0800 Subject: [PATCH] [Core] Make V1 work and enable V1 engine test (#389) 1. Make sure the version is string before parse in collect_env 2. Add basic V1 engine test Signed-off-by: wangxiyuan --- ...d_test.yaml => vllm_ascend_test_main.yaml} | 8 +- collect_env.py | 4 +- tests/ops/test_fused_moe.py | 1 + tests/test_offline_inference.py | 7 +- vllm_ascend/attention/__init__.py | 0 vllm_ascend/platform.py | 28 +++-- vllm_ascend/worker/model_runner_v1.py | 111 ++++++------------ vllm_ascend/worker/worker_v1.py | 2 +- 8 files changed, 66 insertions(+), 95 deletions(-) rename .github/workflows/{vllm_ascend_test.yaml => vllm_ascend_test_main.yaml} (94%) create mode 100644 vllm_ascend/attention/__init__.py diff --git a/.github/workflows/vllm_ascend_test.yaml b/.github/workflows/vllm_ascend_test_main.yaml similarity index 94% rename from .github/workflows/vllm_ascend_test.yaml rename to .github/workflows/vllm_ascend_test_main.yaml index 443b7d7..f896a7c 100644 --- a/.github/workflows/vllm_ascend_test.yaml +++ b/.github/workflows/vllm_ascend_test_main.yaml @@ -126,11 +126,15 @@ jobs: cd /code/pta/ pip install ./torch_npu-2.5.1.dev20250320-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl - - name: Run vllm-project/vllm-ascend test + - name: Run vllm-project/vllm-ascend test for V0 Engine run: | VLLM_USE_V1=0 pytest -sv tests - - name: Run vllm-project/vllm test + - name: Run vllm-project/vllm-ascend test for V1 Engine + run: | + VLLM_USE_V1=1 VLLM_WORKER_MULTIPROC_METHOD=spawn pytest -sv tests + + - name: Run vllm-project/vllm test for V0 Engine env: VLLM_USE_V1: 0 PYTORCH_NPU_ALLOC_CONF: max_split_size_mb:256 diff --git a/collect_env.py b/collect_env.py index 5b24bea..c2127fd 100644 --- a/collect_env.py +++ b/collect_env.py @@ -61,7 +61,6 @@ DEFAULT_CONDA_PATTERNS = { "soumith", "mkl", "magma", - "triton", "optree", "transformers", "zmq", @@ -73,7 +72,6 @@ DEFAULT_PIP_PATTERNS = { "numpy", "mypy", "flake8", - "triton", "optree", "onnx", "transformers", @@ -156,7 +154,7 @@ def get_cmake_version(run_lambda): def _parse_version(version, version_tuple): version_str = version_tuple[-1] - if version_str.startswith('g'): + if isinstance(version_str, str) and version_str.startswith('g'): if '.' in version_str: git_sha = version_str.split('.')[0][1:] date = version_str.split('.')[-1][1:] diff --git a/tests/ops/test_fused_moe.py b/tests/ops/test_fused_moe.py index f72ad99..e78aa8f 100644 --- a/tests/ops/test_fused_moe.py +++ b/tests/ops/test_fused_moe.py @@ -100,3 +100,4 @@ def test_fused_experts( e_map) # TODO: The native params are: atol=2e-2, rtol=0, maybe related to the nan problem torch.testing.assert_close(output, torch_output, atol=4e-2, rtol=1) + torch.npu.empty_cache() diff --git a/tests/test_offline_inference.py b/tests/test_offline_inference.py index 6ad5c96..3d64be9 100644 --- a/tests/test_offline_inference.py +++ b/tests/test_offline_inference.py @@ -45,8 +45,6 @@ def test_models( dtype: str, max_tokens: int, ) -> None: - os.environ["VLLM_ATTENTION_BACKEND"] = "ASCEND" - # 5042 tokens for gemma2 # gemma2 has alternating sliding window size of 4096 # we need a prompt with more than 4096 tokens to test the sliding window @@ -60,3 +58,8 @@ def test_models( enforce_eager=False, gpu_memory_utilization=0.7) as vllm_model: vllm_model.generate_greedy(example_prompts, max_tokens) + + +if __name__ == "__main__": + import pytest + pytest.main([__file__]) diff --git a/vllm_ascend/attention/__init__.py b/vllm_ascend/attention/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/vllm_ascend/platform.py b/vllm_ascend/platform.py index 65381bc..b00c4b8 100644 --- a/vllm_ascend/platform.py +++ b/vllm_ascend/platform.py @@ -21,13 +21,16 @@ from typing import TYPE_CHECKING, Optional, Tuple import torch import torch_npu # noqa: F401 import vllm.envs as envs -from vllm.config import CompilationLevel, VllmConfig +from vllm.config import CompilationLevel from vllm.logger import init_logger from vllm.platforms import Platform, PlatformEnum if TYPE_CHECKING: + from vllm.config import ModelConfig, VllmConfig from vllm.utils import FlexibleArgumentParser else: + ModelConfig = None + VllmConfig = None FlexibleArgumentParser = None os.environ["RAY_EXPERIMENTAL_NOSET_ASCEND_RT_VISIBLE_DEVICES"] = "1" @@ -108,14 +111,14 @@ class NPUPlatform(Platform): parallel_config.worker_cls = "vllm_ascend.worker.worker.NPUWorker" cache_config = vllm_config.cache_config - if cache_config and cache_config.block_size is None: - cache_config.block_size = 128 - - if envs.VLLM_USE_V1 and cache_config and cache_config.enable_prefix_caching: - logger.warning( - "Prefix caching is not supported for V1 now, disable prefix caching" - ) - cache_config.enable_prefix_caching = False + if cache_config: + if cache_config.block_size is None: + cache_config.block_size = 128 + if envs.VLLM_USE_V1 and cache_config.enable_prefix_caching: + logger.warning( + "Prefix caching is not supported for V1 now, disable prefix caching" + ) + cache_config.enable_prefix_caching = False @classmethod def get_attn_backend_cls(cls, selected_backend, head_size, dtype, @@ -140,3 +143,10 @@ class NPUPlatform(Platform): @classmethod def is_pin_memory_available(cls): return True + + @classmethod + def supports_v1(cls, model_config: ModelConfig) -> bool: + """Returns whether the current platform can support v1 for the supplied + model configuration. + """ + return True diff --git a/vllm_ascend/worker/model_runner_v1.py b/vllm_ascend/worker/model_runner_v1.py index 4648d76..8b080c7 100644 --- a/vllm_ascend/worker/model_runner_v1.py +++ b/vllm_ascend/worker/model_runner_v1.py @@ -42,9 +42,7 @@ from vllm.utils import (STR_DTYPE_TO_TORCH_DTYPE, DeviceMemoryProfiler, from vllm.v1.core.encoder_cache_manager import compute_encoder_budget from vllm.v1.kv_cache_interface import (FullAttentionSpec, KVCacheConfig, KVCacheSpec) -from vllm.v1.outputs import ModelRunnerOutput -from vllm.v1.sample.rejection_sampler import INVALID_TOKEN_ID, RejectionSampler -from vllm.v1.spec_decode.ngram_proposer import NgramProposer +from vllm.v1.outputs import EMPTY_MODEL_RUNNER_OUTPUT, ModelRunnerOutput from vllm.v1.utils import bind_kv_cache from vllm.v1.worker.gpu_input_batch import CachedRequestState, InputBatch @@ -112,7 +110,7 @@ class NPUModelRunner: encoder_compute_budget, encoder_cache_size = compute_encoder_budget( model_config=model_config, scheduler_config=scheduler_config, - ) + mm_registry=self.mm_registry) self.max_num_encoder_input_tokens = encoder_compute_budget self.encoder_cache_size = encoder_cache_size @@ -124,21 +122,6 @@ class NPUModelRunner: # Set up speculative decoding. self.use_spec_decode = False - if self.speculative_config: - self.use_spec_decode = True - self.rejection_sampler = RejectionSampler() - # TODO: find a better way to check if we are using ngram. - assert self.speculative_config.ngram_prompt_lookup_min, \ - "Currently, only ngram spec decode is supported in V1." - if get_pp_group().is_last_rank: - self.drafter = NgramProposer() - # Trigger Numba JIT compilation for N-gram proposer. - # This usually takes less than 1 second. - self.drafter.propose( - np.zeros(1024, dtype=np.int32), - self.speculative_config.ngram_prompt_lookup_min, - self.speculative_config.num_speculative_tokens, - ) # Request states. self.requests: Dict[str, CachedRequestState] = {} @@ -504,6 +487,9 @@ class NPUModelRunner: intermediate_tensors: Optional[IntermediateTensors] = None, ) -> Union[ModelRunnerOutput, torch.Tensor]: self._update_states(scheduler_output) + if not scheduler_output.total_num_scheduled_tokens: + # Return empty ModelRunnerOuptut if there's no work to do. + return EMPTY_MODEL_RUNNER_OUTPUT hidden_states = self._process_reqs(scheduler_output, intermediate_tensors) logits = self.model.compute_logits(hidden_states, None) @@ -540,15 +526,6 @@ class NPUModelRunner: if max_gen_len == 1: # No spec decode tokens. valid_sampled_token_ids = sampled_token_ids.tolist() - else: - # Includes spec decode tokens. - valid_mask = sampled_token_ids != INVALID_TOKEN_ID - gen_lens = valid_mask.sum(dim=1).tolist() - # TODO(woosuk): Optimize this. - valid_sampled_token_ids = [ - seq.tolist() - for seq in sampled_token_ids[valid_mask].split(gen_lens) - ] model_runner_output = ModelRunnerOutput( req_ids=self.input_batch.req_ids, @@ -723,34 +700,6 @@ class NPUModelRunner: self.encoder_cache.clear() gc.collect() - def generate_draft_token_ids( - self, - sampled_token_ids: list[list[int]], - ) -> list[list[int]]: - # TODO(woosuk): Optimize. - draft_token_ids: list[list[int]] = [] - for i, sampled_ids in enumerate(sampled_token_ids): - num_sampled_ids = len(sampled_ids) - if not num_sampled_ids: - # Skip speculative decoding. - draft_token_ids.append([]) - continue - - # Add sampled_token_ids to token_ids_cpu. - start_idx = self.input_batch.num_tokens_no_spec[i] - end_idx = start_idx + num_sampled_ids - self.input_batch.token_ids_cpu[i, start_idx:end_idx] = sampled_ids - drafter_output = self.drafter.propose( - self.input_batch.token_ids_cpu[i, :end_idx], - self.speculative_config.ngram_prompt_lookup_min, - self.speculative_config.num_speculative_tokens, - ) - if drafter_output is None or len(drafter_output) == 0: - draft_token_ids.append([]) - else: - draft_token_ids.append(drafter_output.tolist()) - return draft_token_ids - def load_model(self) -> None: logger.info("Starting to load model %s...", self.model_config.model) @@ -770,34 +719,40 @@ class NPUModelRunner: kv_cache_config: Configuration for the KV cache, including the KV cache size of each layer """ - if len(kv_cache_config.groups) > 1: - raise NotImplementedError( - "Hybrid models with more than one KV cache type are not " - "supported yet.") - kv_caches: Dict[str, torch.Tensor] = {} - - for layer_name, layer_spec in kv_cache_config.kv_cache_spec.items(): - tensor_config = kv_cache_config.tensors[layer_name] - assert tensor_config.size % layer_spec.page_size_bytes == 0 - num_blocks = tensor_config.size // layer_spec.page_size_bytes - if isinstance(layer_spec, FullAttentionSpec): - kv_cache_shape = AscendAttentionBackend.get_kv_cache_shape( - num_blocks, layer_spec.block_size, layer_spec.num_kv_heads, - layer_spec.head_size) - dtype = layer_spec.dtype - kv_caches[layer_name] = torch.zeros(kv_cache_shape, - dtype=dtype, - device=self.device) - else: - raise NotImplementedError + for kv_cache_group in kv_cache_config.kv_cache_groups: + kv_cache_spec = kv_cache_group.kv_cache_spec + for layer_name in kv_cache_group.layer_names: + tensor_config = kv_cache_config.tensors[layer_name] + assert tensor_config.size % kv_cache_spec.page_size_bytes == 0 + num_blocks = tensor_config.size // kv_cache_spec.page_size_bytes + # `num_blocks` is the number of blocks the model runner can use. + # `kv_cache_config.num_blocks` is the number of blocks that + # KVCacheManager may allocate. + # Since different GPUs may have different number of layers and + # different memory capacities, `num_blocks` can be different on + # different GPUs, and `kv_cache_config.num_blocks` is set to + # the min of all `num_blocks`. Verify it here. + assert num_blocks >= kv_cache_config.num_blocks + if isinstance(kv_cache_spec, FullAttentionSpec): + kv_cache_shape = AscendAttentionBackend.get_kv_cache_shape( + num_blocks, kv_cache_spec.block_size, + kv_cache_spec.num_kv_heads, kv_cache_spec.head_size) + dtype = kv_cache_spec.dtype + kv_caches[layer_name] = torch.zeros(kv_cache_shape, + dtype=dtype, + device=self.device) + else: + # TODO: add new branches when introducing more types of + # KV cache specs. + raise ValueError("Unknown KV cache spec type.") bind_kv_cache( kv_caches, self.vllm_config.compilation_config.static_forward_context, self.kv_caches) - def get_kv_cache_spec(self) -> KVCacheSpec: + def get_kv_cache_spec(self) -> dict[str, KVCacheSpec]: """ Generates the KVCacheSpec by parsing the kv cache format from each Attention module in the static forward context. @@ -809,7 +764,7 @@ class NPUModelRunner: forward_ctx = self.vllm_config.compilation_config.static_forward_context block_size = self.vllm_config.cache_config.block_size use_mla = self.vllm_config.model_config.use_mla - kv_cache_spec: KVCacheSpec = {} + kv_cache_spec: dict[str, KVCacheSpec] = {} for layer_name, attn_module in forward_ctx.items(): if isinstance(attn_module, FusedMoE): continue diff --git a/vllm_ascend/worker/worker_v1.py b/vllm_ascend/worker/worker_v1.py index dd840e9..b9361d2 100644 --- a/vllm_ascend/worker/worker_v1.py +++ b/vllm_ascend/worker/worker_v1.py @@ -210,7 +210,7 @@ class NPUWorker(WorkerBase): def get_model(self) -> nn.Module: return self.model_runner.get_model() - def get_kv_cache_spec(self) -> KVCacheSpec: + def get_kv_cache_spec(self) -> dict[str, KVCacheSpec]: return self.model_runner.get_kv_cache_spec() def initialize_from_config(self, kv_cache_config: KVCacheConfig) -> None: