Refactor e2e CI (#2276)

Refactor E2E CI to make it clear and faster
1. remove some uesless e2e test
2. remove some uesless function
3. Make sure all test runs with VLLMRunner to avoid oom error
4. Make sure all ops test end with torch.empty_cache to avoid oom error
5. run the test one by one to avoid resource limit error


- vLLM version: v0.10.1.1
- vLLM main:
a344a5aa0a

Signed-off-by: wangxiyuan <wangxiyuan1007@gmail.com>
This commit is contained in:
wangxiyuan
2025-09-02 09:02:22 +08:00
committed by GitHub
parent 0df059f41a
commit fef18b60bc
41 changed files with 374 additions and 1757 deletions

View File

@@ -191,27 +191,29 @@ jobs:
VLLM_WORKER_MULTIPROC_METHOD: spawn
VLLM_USE_MODELSCOPE: True
run: |
pytest -sv tests/e2e/singlecard/test_offline_inference.py
# pytest -sv tests/e2e/singlecard/test_ilama_lora.py
pytest -sv tests/e2e/singlecard/test_guided_decoding.py
# We found that if running aclgraph tests in batch, it will cause AclmdlRICaptureBegin error. So we run
# the test separately.
pytest -sv tests/e2e/singlecard/test_aclgraph.py
pytest -sv tests/e2e/singlecard/test_ascend_scheduler.py
pytest -sv tests/e2e/singlecard/test_camem.py
pytest -sv tests/e2e/singlecard/test_chunked.py
pytest -sv tests/e2e/singlecard/test_embedding.py
pytest -sv tests/e2e/singlecard/test_guided_decoding.py
# TODO: Fix lora accuracy error
# pytest -sv tests/e2e/singlecard/test_ilama_lora.py
pytest -sv tests/e2e/singlecard/test_profile_execute_duration.py
pytest -sv tests/e2e/singlecard/test_quantization.py
pytest -sv tests/e2e/singlecard/test_sampler.py
pytest -sv tests/e2e/singlecard/test_vlm.py
# ------------------------------------ v1 spec decode test ------------------------------------ #
pytest -sv tests/e2e/singlecard/spec_decode_v1/test_v1_mtp_correctness.py
# TODO: revert me when test_v1_spec_decode.py::test_ngram_correctness is fixed
pytest -sv tests/e2e/singlecard/spec_decode_v1/test_v1_spec_decode.py
# All other tests, ignore: 310p test, accuracy test.
pytest -sv tests/e2e/singlecard/ \
--ignore=tests/e2e/singlecard/test_offline_inference.py \
--ignore=tests/e2e/singlecard/test_ilama_lora.py \
--ignore=tests/e2e/singlecard/test_guided_decoding.py \
--ignore=tests/e2e/singlecard/test_camem.py \
--ignore=tests/e2e/singlecard/test_embedding.py \
--ignore=tests/e2e/singlecard/spec_decode_v1/test_v1_mtp_correctness.py \
--ignore=tests/e2e/singlecard/spec_decode_v1/test_v1_spec_decode.py \
--ignore=tests/e2e/singlecard/test_offline_inference_310p.py
pytest -sv tests/e2e/singlecard/ops/
e2e-2-cards:
needs: [e2e]
if: ${{ needs.e2e.result == 'success' }}
@@ -273,17 +275,23 @@ jobs:
VLLM_WORKER_MULTIPROC_METHOD: spawn
VLLM_USE_MODELSCOPE: True
run: |
pytest -sv tests/e2e/multicard/test_data_parallel.py
pytest -sv tests/e2e/multicard/test_expert_parallel.py
# external_launcher test is not stable enough. Fix it later
# pytest -sv tests/e2e/multicard/test_external_launcher.py
pytest -sv tests/e2e/multicard/test_fused_moe_allgather_ep.py
# pytest -sv tests/e2e/multicard/test_ilama_lora_tp2.py
# Fixme: run VLLM_USE_MODELSCOPE=True pytest -sv tests/e2e/multicard/test_offline_inference_distributed.py will raise error.
# To avoid oom, we need to run the test in a single process.
pytest -sv tests/e2e/multicard/test_offline_inference_distributed.py::test_models_distributed_DeepSeek_multistream_moe
pytest -sv tests/e2e/multicard/test_offline_inference_distributed.py::test_models_distributed_QwQ
pytest -sv tests/e2e/multicard/test_offline_inference_distributed.py::test_models_distributed_DeepSeekV3_dbo
pytest -sv tests/e2e/multicard/test_offline_inference_distributed.py::test_models_distributed_DeepSeek_multistream_moe
#pytest -sv tests/e2e/multicard/test_offline_inference_distributed.py::test_models_distributed_pangu
#pytest -sv tests/e2e/multicard/test_offline_inference_distributed.py::test_models_distributed_Qwen3_W8A8
pytest -sv tests/e2e/multicard/test_offline_inference_distributed.py::test_models_distributed_Qwen3_W4A8DYNAMIC
pytest -sv tests/e2e/multicard/test_offline_inference_distributed.py::test_models_distributed_DeepSeek_W4A8DYNAMIC
pytest -sv tests/e2e/multicard/test_offline_inference_distributed.py::test_sp_for_qwen3_moe
pytest -sv tests/e2e/multicard/test_data_parallel.py
pytest -sv tests/e2e/multicard/ --ignore=tests/e2e/multicard/test_ilama_lora_tp2.py \
--ignore=tests/e2e/multicard/test_offline_inference_distributed.py \
--ignore=tests/e2e/multicard/test_data_parallel.py \
--ignore=tests/e2e/multicard/test_offline_inference_310p.py
#pytest -sv tests/e2e/multicard/test_pipeline_parallel.py
#pytest -sv tests/e2e/multicard/test_prefix_caching.py
#pytest -sv tests/e2e/multicard/test_qwen3_moe.py
#pytest -sv tests/e2e/multicard/test_torchair_graph_mode.py

View File

@@ -111,7 +111,7 @@ jobs:
PYTORCH_NPU_ALLOC_CONF: max_split_size_mb:256
run: |
if [[ "${{ matrix.os }}" == "linux-aarch64-310p-1" ]]; then
pytest -sv tests/e2e/singlecard/test_offline_inference_310p.py
pytest -sv tests/e2e/310p/test_offline_inference_310p.py
else
pytest -sv tests/e2e/multicard/test_offline_inference_310p.py
fi
pytest -sv tests/e2e/310p/test_offline_inference_parallel_310p.py
fi

View File

@@ -21,7 +21,7 @@ from vllm import SamplingParams
import vllm_ascend # noqa: F401
from tests.e2e.conftest import VllmRunner
MODELS = ["Qwen/Qwen3-0.6B-Base", "Qwen/Qwen2.5-7B-Instruct"]
MODELS = ["Qwen/Qwen3-0.6B", "Qwen/Qwen2.5-7B-Instruct"]
@pytest.mark.parametrize("model", MODELS)

View File

@@ -33,13 +33,11 @@ from transformers import (AutoConfig, AutoModelForCausalLM, AutoTokenizer,
from transformers.models.auto.auto_factory import _BaseAutoModelClass
from vllm import LLM, SamplingParams
from vllm.config import TaskOption, _get_and_verify_dtype
from vllm.inputs import ExplicitEncoderDecoderPrompt, TextPrompt, TokensPrompt
from vllm.inputs import TextPrompt
from vllm.outputs import RequestOutput
from vllm.sampling_params import BeamSearchParams
from vllm.transformers_utils.utils import maybe_model_redirect
from vllm.utils import is_list_of
from tests.e2e.model_utils import (PROMPT_TEMPLATES, TokensTextLogprobs,
from tests.e2e.model_utils import (TokensTextLogprobs,
TokensTextLogprobsPromptLogprobs)
# TODO: remove this part after the patch merged into vllm, if
# we not explicitly patch here, some of them might be effectiveless
@@ -62,7 +60,6 @@ PromptAudioInput = _PromptMultiModalInput[Tuple[np.ndarray, int]]
PromptVideoInput = _PromptMultiModalInput[np.ndarray]
_TEST_DIR = os.path.dirname(__file__)
_TEST_PROMPTS = [os.path.join(_TEST_DIR, "prompts", "example.txt")]
def cleanup_dist_env_and_memory(shutdown_ray: bool = False):
@@ -89,13 +86,13 @@ class VllmRunner:
# Use smaller max model length, otherwise bigger model cannot run due
# to kv cache size limit.
max_model_len: int = 1024,
dtype: str = "half",
dtype: str = "auto",
disable_log_stats: bool = True,
tensor_parallel_size: int = 1,
block_size: int = 16,
enable_chunked_prefill: bool = False,
swap_space: int = 4,
enforce_eager: Optional[bool] = True,
enforce_eager: Optional[bool] = False,
quantization: Optional[str] = None,
**kwargs,
) -> None:
@@ -220,26 +217,6 @@ class VllmRunner:
if sampling_params.prompt_logprobs is None else
toks_str_logsprobs_prompt_logprobs)
def generate_encoder_decoder_w_logprobs(
self,
encoder_decoder_prompts: List[ExplicitEncoderDecoderPrompt[str, str]],
sampling_params: SamplingParams,
) -> Union[List[TokensTextLogprobs],
List[TokensTextLogprobsPromptLogprobs]]:
'''
Logprobs generation for vLLM encoder/decoder models
'''
assert sampling_params.logprobs is not None
req_outputs = self.model.generate(encoder_decoder_prompts,
sampling_params=sampling_params)
toks_str_logsprobs_prompt_logprobs = (
self._final_steps_generate_w_logprobs(req_outputs))
# Omit prompt logprobs if not required by sampling params
return ([x[0:-1] for x in toks_str_logsprobs_prompt_logprobs]
if sampling_params.prompt_logprobs is None else
toks_str_logsprobs_prompt_logprobs)
def generate_greedy(
self,
prompts: List[str],
@@ -284,53 +261,6 @@ class VllmRunner:
audios=audios,
videos=videos)
def generate_encoder_decoder_greedy_logprobs(
self,
encoder_decoder_prompts: List[ExplicitEncoderDecoderPrompt[str, str]],
max_tokens: int,
num_logprobs: int,
num_prompt_logprobs: Optional[int] = None,
) -> Union[List[TokensTextLogprobs],
List[TokensTextLogprobsPromptLogprobs]]:
greedy_logprobs_params = SamplingParams(
temperature=0.0,
max_tokens=max_tokens,
logprobs=num_logprobs,
prompt_logprobs=(num_prompt_logprobs),
)
'''
Greedy logprobs generation for vLLM encoder/decoder models
'''
return self.generate_encoder_decoder_w_logprobs(
encoder_decoder_prompts, greedy_logprobs_params)
def generate_beam_search(
self,
prompts: Union[List[str], List[List[int]]],
beam_width: int,
max_tokens: int,
) -> List[Tuple[List[List[int]], List[str]]]:
if is_list_of(prompts, str, check="all"):
prompts = [TextPrompt(prompt=prompt) for prompt in prompts]
else:
prompts = [
TokensPrompt(prompt_token_ids=tokens) for tokens in prompts
]
outputs = self.model.beam_search(
prompts,
BeamSearchParams(beam_width=beam_width, max_tokens=max_tokens))
returned_outputs = []
for output in outputs:
token_ids = [x.tokens for x in output.sequences]
texts = [x.text for x in output.sequences]
returned_outputs.append((token_ids, texts))
return returned_outputs
def classify(self, prompts: List[str]) -> List[List[float]]:
req_outputs = self.model.classify(prompts)
return [req_output.outputs.probs for req_output in req_outputs]
def encode(
self,
prompts: List[str],
@@ -346,14 +276,6 @@ class VllmRunner:
req_outputs = self.model.embed(inputs)
return [req_output.outputs.embedding for req_output in req_outputs]
def score(
self,
text_1: Union[str, List[str]],
text_2: Union[str, List[str]],
) -> List[float]:
req_outputs = self.model.score(text_1, text_2)
return [req_output.outputs.score for req_output in req_outputs]
def __enter__(self):
return self
@@ -362,35 +284,6 @@ class VllmRunner:
cleanup_dist_env_and_memory()
@pytest.fixture(scope="session")
def vllm_runner():
return VllmRunner
@pytest.fixture(params=list(PROMPT_TEMPLATES.keys()))
def prompt_template(request):
return PROMPT_TEMPLATES[request.param]
def _read_prompts(filename: str) -> list[str]:
with open(filename) as f:
prompts = f.readlines()
return prompts
@pytest.fixture
def example_prompts() -> list[str]:
prompts = []
for filename in _TEST_PROMPTS:
prompts += _read_prompts(filename)
return prompts
@pytest.fixture(scope="session")
def ilama_lora_files():
return snapshot_download(repo_id="vllm-ascend/ilama-text2sql-spider")
class HfRunner:
def get_default_device(self):
@@ -515,5 +408,22 @@ class HfRunner:
@pytest.fixture(scope="session")
def hf_runner():
return HfRunner
def ilama_lora_files():
return snapshot_download(repo_id="vllm-ascend/ilama-text2sql-spider")
def qwen_prompt(questions: List[str]) -> List[str]:
placeholder = "<|image_pad|>"
return [("<|im_start|>system\nYou are a helpful assistant.<|im_end|>\n"
f"<|im_start|>user\n<|vision_start|>{placeholder}<|vision_end|>"
f"{q}<|im_end|>\n<|im_start|>assistant\n") for q in questions]
PROMPT_TEMPLATES = {
"qwen2.5vl": qwen_prompt,
}
@pytest.fixture(params=list(PROMPT_TEMPLATES.keys()))
def prompt_template(request):
return PROMPT_TEMPLATES[request.param]

View File

@@ -17,10 +17,9 @@
# Adapted from vllm-project/vllm/blob/main/tests/models/utils.py
#
import warnings
from typing import Callable, Dict, List, Optional, Sequence, Tuple, Union
from typing import Dict, List, Optional, Sequence, Tuple, Union
from vllm.sequence import Logprob, PromptLogprobs, SampleLogprobs
from vllm.sequence import PromptLogprobs, SampleLogprobs
TokensText = Tuple[List[int], str]
@@ -63,17 +62,6 @@ TokensTextLogprobs = Tuple[List[int], str, Optional[Union[List[Dict[int,
float]],
SampleLogprobs]]]
# Allow for tokens to be represented as str's rather than IDs;
# tuple of
# * Token string representations list
# * String
# * Optional list of top sample logprobs for each sampled token
#
# Assumes prompt logprobs were not requested.
TextTextLogprobs = Tuple[List[str], str, Optional[Union[List[Dict[str, float]],
List[Dict[str,
Logprob]]]]]
# Representation of generated sequence as a tuple of
# * Token ID list
# * String
@@ -84,191 +72,3 @@ TextTextLogprobs = Tuple[List[str], str, Optional[Union[List[Dict[str, float]],
TokensTextLogprobsPromptLogprobs = Tuple[
List[int], str, Optional[Union[List[Dict[int, float]], SampleLogprobs]],
Optional[Union[List[Optional[Dict[int, float]]], PromptLogprobs]]]
def check_logprobs_close(
*,
outputs_0_lst: Sequence[Union[TokensTextLogprobs,
TokensTextLogprobsPromptLogprobs,
TextTextLogprobs]],
outputs_1_lst: Sequence[Union[TokensTextLogprobs,
TokensTextLogprobsPromptLogprobs,
TextTextLogprobs]],
name_0: str,
name_1: str,
num_outputs_0_skip_tokens: int = 0,
warn_on_mismatch: bool = True,
always_check_logprobs: bool = False,
) -> None:
"""Compare the logprobs of two sequences generated by different models,
which should be similar but not necessarily equal.
How sample logprobs are compared:
* `always_check_logprobs == True`: set of highest-logprob token ids
must match between seq0 and seq1 at all sampled token offsets
* `always_check_logprobs == False`: highest-logprob token ids are
only compared at sampled token offsets for which generated token
ids don't match
Prompt logprobs must be provided either for both input sequences, or
for neither. If prompt logprobs are provided, then highest-logprob
prompt token ids must match between seq0 and seq1 at all prompt token
offsets.
Args:
outputs_0_lst: First sequence to compare
outputs_0_lst: Second sequence to compare
name_0: sequence #0 name
name_1: sequence #1 name
num_outputs_0_skip_tokens: If > 0, specifies the number of initial
sequence #0 tokens & logprobs to discard
before comparison, i.e. all
of sequence #1 will be compared to
sequence #0 beginning at index
num_outputs_0_skip_tokens
warn_on_mismatch: Issue a warning if there is token-wise or text-wise
mismatch between the two sequences
always_check_logprobs: If true, check logprobs even when tokens match
"""
assert len(outputs_0_lst) == len(outputs_1_lst)
# Loop through responses to each prompt.
for prompt_idx, (outputs_0,
outputs_1) in enumerate(zip(outputs_0_lst,
outputs_1_lst)):
assert len(outputs_0) == len(outputs_1)
if len(outputs_0) == 3:
assert len(outputs_1) == 3
# Break out tokens, text & sample logprobs
# (prompt logprobs were not provided)
output_ids_0, output_str_0, logprobs_0 = outputs_0
output_ids_1, output_str_1, logprobs_1 = outputs_1
elif len(outputs_0) == 4:
assert len(outputs_1) == 4
# Break out tokens, text, sample logprobs & prompt logprobs
(
output_ids_0,
output_str_0,
logprobs_0,
prompt_logprobs_0,
) = outputs_0
(
output_ids_1,
output_str_1,
logprobs_1,
prompt_logprobs_1,
) = outputs_1
# Test prompt logprobs closeness
if (prompt_logprobs_0 is not None
and prompt_logprobs_1 is not None):
# Both sequences' prompt logprobs lists are not `None``
# (although individual list elements may be `None`);
# for each token's logprobs:
for idx, (logprobs_elem_0, logprobs_elem_1) in enumerate(
zip(prompt_logprobs_0, prompt_logprobs_1)):
fail_msg = (
f"Prompt logprobs test:"
f"\n{name_0}:\tPrompt index {idx}\t{logprobs_elem_0}"
f"\n{name_1}:\tPrompt index {idx}\t{logprobs_elem_1}")
if logprobs_elem_0 is None:
# If the seq 0 token's logprobs are `None`,
# the seq 1 token's logprobs must be `None`
assert logprobs_elem_1 is None, fail_msg
else:
# If the seq 0 token's logprobs are not `None`,
# the seq 1 token's logprobs must not be `None`
assert logprobs_elem_1 is not None, fail_msg
# Logprobs check: top-k token choices must be the same
assert (set(logprobs_elem_0.keys()) == set(
logprobs_elem_1.keys())), fail_msg
else:
# Both sequence logprobs lists must be `None`
fail_msg = (f"Prompt logprobs test:"
f"\n{name_0}:\tlogprobs\t{prompt_logprobs_0}"
f"\n{name_1}:\tlogprobs\t{prompt_logprobs_1}")
assert (prompt_logprobs_0 is None
and prompt_logprobs_1 is None), fail_msg
else:
raise ValueError(f"Outputs tuple must have 3 or 4 elements but "
f"{len(outputs_0)} elements were provided: "
f"{outputs_0}")
if logprobs_0 is None:
logprobs_0 = [None] * len(output_ids_0)
if logprobs_1 is None:
logprobs_1 = [None] * len(output_ids_1)
# Skip specified number of initial sequence #0 tokens
# & logprobs, leaving output text as-is for simplicity
# (text mismatches may generate warnings but do not
# cause the test to fail.)
if num_outputs_0_skip_tokens < 0:
raise ValueError("num_outputs_0_skip_tokens must be non-negative")
output_ids_0 = output_ids_0[num_outputs_0_skip_tokens:]
logprobs_0 = logprobs_0[num_outputs_0_skip_tokens:]
# Loop through generated tokens.
for idx, (output_id_0,
output_id_1) in enumerate(zip(output_ids_0, output_ids_1)):
is_tok_mismatch = output_id_0 != output_id_1
# If generated tokens don't match
# or it is desired to always check logprobs,
# then
if is_tok_mismatch or always_check_logprobs:
logprobs_elem_0 = logprobs_0[idx]
logprobs_elem_1 = logprobs_1[idx]
# Each predicted token must be in top N logprobs of the other
fail_msg = (
f"Test{prompt_idx}:"
f"\nMatched tokens:\t{output_ids_0[:idx]}"
f"\n{name_0}:\t{output_str_0!r}\t{logprobs_elem_0}"
f"\n{name_1}:\t{output_str_1!r}\t{logprobs_elem_1}")
assert logprobs_elem_0 is not None, fail_msg
assert logprobs_elem_1 is not None, fail_msg
assert output_id_0 in logprobs_elem_1, fail_msg
assert output_id_1 in logprobs_elem_0, fail_msg
if warn_on_mismatch and is_tok_mismatch:
with warnings.catch_warnings():
# This ensures that repeated warnings are shown
# in the output, not just the first occurrence
warnings.simplefilter("always")
warnings.warn(fail_msg, stacklevel=2)
# Break out since sequences will now diverge.
break
else:
if output_str_0 != output_str_1 and warn_on_mismatch:
# The token outputs exactly match,
# so the text outputs should exactly match as well
fail_msg = (f"Test{prompt_idx}:"
f"\n{name_0}:\t{output_str_0!r}"
f"\n{name_1}:\t{output_str_1!r}")
with warnings.catch_warnings():
# This ensures that repeated warnings are shown
# in the output, not just the first occurrence
warnings.simplefilter("always")
warnings.warn(fail_msg, stacklevel=2)
def qwen_prompt(questions: List[str]) -> List[str]:
placeholder = "<|image_pad|>"
return [("<|im_start|>system\nYou are a helpful assistant.<|im_end|>\n"
f"<|im_start|>user\n<|vision_start|>{placeholder}<|vision_end|>"
f"{q}<|im_end|>\n<|im_start|>assistant\n") for q in questions]
# Map of prompt templates for different models.
PROMPT_TEMPLATES: dict[str, Callable] = {
"qwen2.5vl": qwen_prompt,
}

View File

@@ -27,7 +27,7 @@ from unittest.mock import patch
import pytest
MODELS = ["Qwen/Qwen2.5-0.5B-Instruct", "Qwen/Qwen3-30B-A3B"]
MODELS = ["Qwen/Qwen3-0.6B", "Qwen/Qwen3-30B-A3B"]
@pytest.mark.parametrize("model", MODELS)

View File

@@ -1,59 +0,0 @@
# Copyright (c) 2025 Huawei Technologies Co., Ltd. All Rights Reserved.
# Copyright 2023 The vLLM team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# This file is a part of the vllm-ascend project.
#
import pytest
import torch
from vllm import SamplingParams
from tests.e2e.conftest import VllmRunner
MODELS = [
"Qwen/Qwen2.5-0.5B-Instruct",
]
TENSOR_PARALLELS = [2]
prompts = [
"Hello, my name is",
"The future of AI is",
]
@pytest.mark.parametrize("model", MODELS)
@pytest.mark.parametrize("tp_size", TENSOR_PARALLELS)
@pytest.mark.parametrize("max_tokens", [64])
@pytest.mark.parametrize("temperature", [0.0])
@pytest.mark.parametrize("ignore_eos", [True])
def test_models(model: str, tp_size: int, max_tokens: int, temperature: int,
ignore_eos: bool) -> None:
# Create an LLM.
with VllmRunner(
model_name=model,
tensor_parallel_size=tp_size,
) as vllm_model:
# Prepare sampling_parames
sampling_params = SamplingParams(
max_tokens=max_tokens,
temperature=temperature,
ignore_eos=ignore_eos,
)
# Generate texts from the prompts.
# The output is a list of RequestOutput objects
outputs = vllm_model.generate(prompts, sampling_params)
torch.npu.synchronize()
# The output length should be equal to prompts length.
assert len(outputs) == len(prompts)

View File

@@ -14,12 +14,14 @@ def test_e2e_ep_correctness(model_name):
]
max_tokens = 5
with VllmRunner(model_name, tensor_parallel_size=2) as vllm_model:
with VllmRunner(model_name, tensor_parallel_size=2,
enforce_eager=True) as vllm_model:
tp_output = vllm_model.generate_greedy(example_prompts, max_tokens)
with VllmRunner(model_name,
tensor_parallel_size=2,
enable_expert_parallel=True) as vllm_model:
enable_expert_parallel=True,
enforce_eager=True) as vllm_model:
ep_output = vllm_model.generate_greedy(example_prompts, max_tokens)
check_outputs_equal(

View File

@@ -47,7 +47,6 @@ def test_generate_with_allgather():
with VllmRunner(snapshot_download("vllm-ascend/DeepSeek-V3-Pruning"),
tensor_parallel_size=2,
enforce_eager=True,
max_model_len=1024,
dtype="auto",
enable_expert_parallel=True,
@@ -75,7 +74,6 @@ def test_generate_with_alltoall():
with VllmRunner(snapshot_download("vllm-ascend/DeepSeek-V3-Pruning"),
tensor_parallel_size=2,
enforce_eager=True,
max_model_len=1024,
dtype="auto",
enable_expert_parallel=True,

View File

@@ -11,11 +11,12 @@ def test_ilama_lora_tp2(distributed_executor_backend, ilama_lora_files):
with VllmRunner(snapshot_download(MODEL_PATH),
enable_lora=True,
max_loras=4,
dtype="half",
max_model_len=1024,
max_num_seqs=16,
tensor_parallel_size=2,
distributed_executor_backend=distributed_executor_backend
) as vllm_model:
distributed_executor_backend=distributed_executor_backend,
enforce_eager=True) as vllm_model:
output = do_sample(vllm_model.model, ilama_lora_files, lora_id=2)
for i in range(len(EXPECTED_LORA_OUTPUT)):

View File

@@ -23,18 +23,12 @@ Run `pytest tests/test_offline_inference.py`.
import os
from unittest.mock import patch
import pytest
from modelscope import snapshot_download # type: ignore
from vllm import SamplingParams
from vllm.model_executor.models.registry import ModelRegistry
from tests.e2e.conftest import VllmRunner
os.environ["PYTORCH_NPU_ALLOC_CONF"] = "max_split_size_mb:256"
DEEPSEEK_W4A8_MODELS = [
"vllm-ascend/DeepSeek-V3-W4A8-Pruing",
"vllm-ascend/DeepSeek-R1-w4a8-pruning"
]
def test_models_distributed_QwQ():
@@ -48,6 +42,7 @@ def test_models_distributed_QwQ():
dtype=dtype,
tensor_parallel_size=2,
distributed_executor_backend="mp",
enforce_eager=True,
) as vllm_model:
vllm_model.generate_greedy(example_prompts, max_tokens)
@@ -73,35 +68,10 @@ def test_models_distributed_DeepSeek_multistream_moe():
},
"refresh": True,
},
enforce_eager=False,
) as vllm_model:
vllm_model.generate_greedy(example_prompts, max_tokens)
@pytest.mark.skip(
reason=
"deepseek dbo dose not consider the support on half precision float, will enable this ut after we actually support it"
)
@patch.dict(os.environ, {"VLLM_ASCEND_ENABLE_DBO": "1"})
def test_models_distributed_DeepSeekV3_dbo():
example_prompts = ["The president of the United States is"] * 41
dtype = "half"
sampling_params = SamplingParams(max_tokens=100, temperature=0.0)
with VllmRunner(
"vllm-ascend/DeepSeek-V3-Pruning",
dtype=dtype,
tensor_parallel_size=2,
distributed_executor_backend="mp",
) as vllm_model:
model_arch = 'DeepseekV3ForCausalLM'
registed_models = ModelRegistry.models
assert registed_models[
model_arch].module_name == "vllm_ascend.models.deepseek_dbo"
assert registed_models[
model_arch].class_name == "CustomDeepseekDBOForCausalLM"
vllm_model.generate(example_prompts, sampling_params)
def test_models_distributed_pangu():
example_prompts = [
"Hello, my name is",
@@ -118,28 +88,6 @@ def test_models_distributed_pangu():
vllm_model.generate_greedy(example_prompts, max_tokens)
@patch.dict(os.environ, {"VLLM_ASCEND_ENABLE_TOPK_TOPP_OPTIMIZATION": "1"})
def test_models_distributed_topk() -> None:
example_prompts = [
"vLLM is a high-throughput and memory-efficient inference and serving engine for LLMs.",
"Briefly describe the major milestones in the development of artificial intelligence from 1950 to 2020.",
"Compare and contrast artificial intelligence with human intelligence in terms of processing information.",
]
dtype = "half"
sampling_params = SamplingParams(max_tokens=5,
temperature=0.0,
top_k=50,
top_p=0.9)
with VllmRunner(
"deepseek-ai/DeepSeek-V2-Lite",
dtype=dtype,
tensor_parallel_size=2,
distributed_executor_backend="mp",
) as vllm_model:
vllm_model.generate(example_prompts, sampling_params)
def test_models_distributed_Qwen3_W8A8():
example_prompts = [
"Hello, my name is",
@@ -172,15 +120,14 @@ def test_models_distributed_Qwen3_W4A8DYNAMIC():
vllm_model.generate_greedy(example_prompts, max_tokens)
@pytest.mark.parametrize("model", DEEPSEEK_W4A8_MODELS)
@patch.dict(os.environ, {"VLLM_ASCEND_MLA_PA": "1"})
def test_models_distributed_DeepSeek_W4A8DYNAMIC(model):
def test_models_distributed_DeepSeek_W4A8DYNAMIC():
prompts = [
"Hello, my name is",
]
max_tokens = 5
with VllmRunner(
snapshot_download(model),
snapshot_download("vllm-ascend/DeepSeek-V3-W4A8-Pruing"),
dtype="auto",
tensor_parallel_size=2,
quantization="ascend",
@@ -207,16 +154,15 @@ def test_sp_for_qwen3_moe() -> None:
top_k=50,
top_p=0.9)
with VllmRunner(
snapshot_download("Qwen/Qwen3-30B-A3B"),
dtype="auto",
tensor_parallel_size=2,
distributed_executor_backend="mp",
compilation_config={
"pass_config": {
"enable_sequence_parallelism": True
}
},
enable_expert_parallel=True,
) as vllm_model:
with VllmRunner(snapshot_download("Qwen/Qwen3-30B-A3B"),
dtype="auto",
tensor_parallel_size=2,
distributed_executor_backend="mp",
compilation_config={
"pass_config": {
"enable_sequence_parallelism": True
}
},
enable_expert_parallel=True,
enforce_eager=True) as vllm_model:
vllm_model.generate(example_prompts, sampling_params)

View File

@@ -42,6 +42,5 @@ def test_models(model: str, tp_size: int, pp_size: int,
tensor_parallel_size=tp_size,
pipeline_parallel_size=pp_size,
distributed_executor_backend=distributed_executor_backend,
enforce_eager=True,
gpu_memory_utilization=0.7) as vllm_model:
vllm_model.generate_greedy(prompts, 64)

View File

@@ -6,6 +6,7 @@ import pytest
from tests.e2e.conftest import VllmRunner
from tests.e2e.model_utils import check_outputs_equal
from vllm_ascend.ascend_config import clear_ascend_config
MODELS = [
# for MHA
@@ -102,6 +103,8 @@ def test_prefix_cache_with_ascend_scheduler(model: str,
gpu_memory_utilization=0.7) as vllm_model:
vllm_output = vllm_model.generate_greedy(INPUT_PROMPTS, max_tokens)
clear_ascend_config()
with VllmRunner(model,
additional_config={
'ascend_scheduler_config': {
@@ -116,6 +119,8 @@ def test_prefix_cache_with_ascend_scheduler(model: str,
prefix_cache_output = vllm_model.generate_greedy(
INPUT_PROMPTS, max_tokens)
clear_ascend_config()
with VllmRunner(model,
additional_config={
'ascend_scheduler_config': {
@@ -131,6 +136,8 @@ def test_prefix_cache_with_ascend_scheduler(model: str,
chunk_prefill_prefix_cache_output = vllm_model.generate_greedy(
INPUT_PROMPTS, max_tokens)
clear_ascend_config()
check_outputs_equal(
outputs_0_lst=vllm_output,
outputs_1_lst=prefix_cache_output,

View File

@@ -1,121 +0,0 @@
#
# Copyright (c) 2025 Huawei Technologies Co., Ltd. All Rights Reserved.
# This file is a part of the vllm-ascend project.
# Adapted from vllm/tests/basic_correctness/test_basic_correctness.py
# Copyright 2023 The vLLM team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import multiprocessing
import os
import torch
from vllm.distributed.parallel_state import (get_world_group,
init_distributed_environment)
from vllm.utils import update_environment_variables
from tests.e2e.conftest import cleanup_dist_env_and_memory
from vllm_ascend.distributed.device_communicators.pyhccl import \
PyHcclCommunicator
os.environ["TOKENIZERS_PARALLELISM"] = "true"
multiprocessing.set_start_method("spawn", force=True)
def _worker_entry(env, fn):
# `multiprocessing.Process` cannot accept environment variables directly
# so we need to pass the environment variables as arguments
# and update the environment variables in the function
update_environment_variables(env)
rank = int(os.environ['RANK'])
local_rank = int(os.environ['LOCAL_RANK'])
word_size = int(os.environ['WORLD_SIZE'])
distributed_init_method = "tcp://localhost:12345"
device = torch.device(f"npu:{local_rank}")
torch.npu.set_device(device)
init_distributed_environment(
world_size=word_size,
rank=rank,
distributed_init_method=distributed_init_method,
local_rank=local_rank,
backend="hccl")
fn()
cleanup_dist_env_and_memory()
def distributed_run(fn, world_size):
number_of_processes = world_size
processes: list[multiprocessing.Process] = []
for i in range(number_of_processes):
env: dict[str, str] = {}
env['RANK'] = str(i)
env['LOCAL_RANK'] = str(i)
env['WORLD_SIZE'] = str(number_of_processes)
env['LOCAL_WORLD_SIZE'] = str(number_of_processes)
p = multiprocessing.Process(target=_worker_entry, args=(env, fn))
processes.append(p)
p.start()
for p in processes:
p.join()
for p in processes:
assert p.exitcode == 0
def worker_fn():
pynccl_comm = PyHcclCommunicator(get_world_group().cpu_group,
device=get_world_group().device)
tensor = torch.ones(16, 1024, 1024,
dtype=torch.float32).npu(pynccl_comm.rank)
tensor = pynccl_comm.all_reduce(tensor)
torch.npu.synchronize()
assert torch.all(tensor == pynccl_comm.world_size).cpu().item()
def test_pyhccl():
distributed_run(worker_fn, 2)
def broadcast_worker_fn():
# Test broadcast for every root rank.
# Essentially this is an all-gather operation.
pyhccl_comm = PyHcclCommunicator(get_world_group().cpu_group,
device=get_world_group().device)
recv_tensors = [
torch.empty(16,
1024,
1024,
dtype=torch.float32,
device=pyhccl_comm.device)
for i in range(pyhccl_comm.world_size)
]
recv_tensors[pyhccl_comm.rank] = torch.ones(
16, 1024, 1024, dtype=torch.float32,
device=pyhccl_comm.device) * pyhccl_comm.rank
for i in range(pyhccl_comm.world_size):
pyhccl_comm.broadcast(recv_tensors[i], src=i)
# the broadcast op might be launched in a different stream
# need to synchronize to make sure the tensor is ready
torch.npu.synchronize()
assert torch.all(recv_tensors[i] == i).cpu().item()
def test_pyhccl_broadcast():
distributed_run(broadcast_worker_fn, 2)

View File

@@ -32,11 +32,9 @@ def test_models_distributed_Qwen3_MOE_TP2():
example_prompts = [
"Hello, my name is",
]
dtype = "half"
max_tokens = 5
with VllmRunner(
"Qwen/Qwen3-30B-A3B",
dtype=dtype,
tensor_parallel_size=2,
distributed_executor_backend="mp",
) as vllm_model:
@@ -47,11 +45,9 @@ def test_models_distributed_Qwen3_MOE_TP2_WITH_EP():
example_prompts = [
"Hello, my name is",
]
dtype = "half"
max_tokens = 5
with VllmRunner(
"Qwen/Qwen3-30B-A3B",
dtype=dtype,
tensor_parallel_size=2,
enable_expert_parallel=True,
distributed_executor_backend="mp",
@@ -64,12 +60,10 @@ def test_models_distributed_Qwen3_MOE_W8A8():
example_prompts = [
"Hello, my name is",
]
dtype = "auto"
max_tokens = 5
with VllmRunner(
snapshot_download("vllm-ascend/Qwen3-30B-A3B-W8A8"),
max_model_len=8192,
dtype=dtype,
tensor_parallel_size=2,
quantization="ascend",
enforce_eager=True,

View File

@@ -23,6 +23,7 @@ import os
from typing import Dict
from tests.e2e.conftest import VllmRunner
from vllm_ascend.ascend_config import clear_ascend_config
os.environ["PYTORCH_NPU_ALLOC_CONF"] = "max_split_size_mb:256"
@@ -54,7 +55,6 @@ def _deepseek_torchair_test_fixture(
dtype="half",
tensor_parallel_size=tensor_parallel_size,
distributed_executor_backend="mp",
enforce_eager=False,
additional_config=additional_config,
) as vllm_model:
# use greedy sampler to make sure the generated results are fix
@@ -85,6 +85,8 @@ def test_e2e_deepseekv3_with_torchair():
}
_deepseek_torchair_test_fixture(additional_config)
clear_ascend_config()
def test_e2e_deepseekv3_with_torchair_ms_mla():
additional_config = {
@@ -95,6 +97,8 @@ def test_e2e_deepseekv3_with_torchair_ms_mla():
}
_deepseek_torchair_test_fixture(additional_config)
clear_ascend_config()
def test_e2e_deepseekv3_with_torchair_v1scheduler():
additional_config = {
@@ -104,6 +108,8 @@ def test_e2e_deepseekv3_with_torchair_v1scheduler():
}
_deepseek_torchair_test_fixture(additional_config, use_v1_schduler=True)
clear_ascend_config()
def _pangu_torchair_test_fixture(
additional_config: Dict,
@@ -131,7 +137,6 @@ def _pangu_torchair_test_fixture(
dtype="half",
tensor_parallel_size=tensor_parallel_size,
distributed_executor_backend="mp",
enforce_eager=False,
additional_config=additional_config,
enable_expert_parallel=True,
) as vllm_model:
@@ -163,6 +168,8 @@ def test_e2e_pangu_with_torchair():
}
_pangu_torchair_test_fixture(additional_config)
clear_ascend_config()
def _qwen_torchair_test_fixture(
model,
@@ -221,6 +228,9 @@ def _qwen_torchair_test_fixture(
def test_e2e_qwen2_with_torchair():
_qwen_torchair_test_fixture("Qwen/Qwen2.5-0.5B-Instruct", 2, False)
clear_ascend_config()
def test_e2e_qwen3_moe_with_torchair():
_qwen_torchair_test_fixture("Qwen/Qwen3-30B-A3B", 2, True)
clear_ascend_config()

View File

@@ -1,118 +0,0 @@
# SPDX-License-Identifier: Apache-2.0
"""
Test the piecewise compilation with a simple model so that we
can exactly calculate the expected output and side effects.
"""
import pytest
import torch
from torch import nn
from torch.library import Library
from vllm.compilation.counter import compilation_counter
from vllm.compilation.decorators import support_torch_compile
from vllm.config import (CompilationConfig, CompilationLevel, VllmConfig,
set_current_vllm_config)
from vllm.utils import direct_register_custom_op
global_counter = 0
# create a library to hold the custom op
silly_lib = Library("silly", "FRAGMENT") # noqa
def silly_attention(q: torch.Tensor, k: torch.Tensor, v: torch.Tensor,
out: torch.Tensor) -> None:
global global_counter
global_counter += 1
print(f"{global_counter=}")
out.copy_(q)
out[0] += 1
def silly_attention_fake(q: torch.Tensor, k: torch.Tensor, v: torch.Tensor,
out: torch.Tensor) -> None:
return
direct_register_custom_op(
op_name="attention",
op_func=silly_attention,
mutates_args=["out"],
fake_impl=silly_attention_fake,
dispatch_key="PrivateUse1",
target_lib=silly_lib,
)
@support_torch_compile
class SillyModel(nn.Module):
def __init__(self,
*,
vllm_config: VllmConfig,
prefix: str = "",
**kwargs) -> None:
super().__init__()
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""
Overall effect:
x += 1
x[0] += 2
global_counter += 2
"""
x = x + 1
x = x + 2
out = torch.empty_like(x)
torch.ops.silly.attention(x, x, x, out)
x = out
x = x - 2
x = x - 1
out = torch.empty_like(x)
torch.ops.silly.attention(x, x, x, out)
x = out
x = x + 1
return x
@pytest.mark.skipif(True, reason="requires unreleased components")
def test_simple_piecewise_compile():
vllm_config = VllmConfig(compilation_config=CompilationConfig(
level=CompilationLevel.PIECEWISE,
use_inductor=False,
use_cudagraph=True,
splitting_ops=["silly.attention"],
cudagraph_copy_inputs=True,
cudagraph_capture_sizes=[1, 2],
))
vllm_config.compilation_config.pass_config.enable_fusion = False
with set_current_vllm_config(vllm_config):
model = SillyModel(vllm_config=vllm_config, prefix="")
inputs = torch.randn(100).npu()
kwargs = {
"num_graphs_seen": 1, # one graph for the model
"num_piecewise_graphs_seen": 5, # 2 * num_layers + 1
"num_piecewise_capturable_graphs_seen": 3, # 1 + num_layers
"num_backend_compilations": 3, # num_piecewise_capturable_graphs_seen
"num_cudagraph_captured":
6 # num_cudagraph_sizes * num_piecewise_capturable_graphs_seen
}
with compilation_counter.expect(kwargs):
model(inputs)
model(torch.randn(2).npu())
model(torch.randn(1).npu())
input = torch.zeros(2).npu()
global global_counter
global_counter = 0
output = model(input)
assert global_counter == 2
assert torch.allclose(output.cpu(), torch.tensor([3.0, 1.0]))
if __name__ == "__main__":
test_simple_piecewise_compile()

View File

@@ -1,3 +1,5 @@
import gc
import torch
from vllm_ascend.utils import enable_custom_op
@@ -18,7 +20,7 @@ def bgmv_expand_cpu_impl(x: torch.Tensor, w: torch.Tensor,
@torch.inference_mode()
def test_bgmv_expand() -> None:
def test_bgmv_expand():
B = 1
x = torch.randn([B, 16], dtype=torch.float)
w = torch.randn([64, 128, 16], dtype=torch.float16)
@@ -39,3 +41,6 @@ def test_bgmv_expand() -> None:
y_out,
atol=DEFAULT_ATOL,
rtol=DEFAULT_RTOL)
gc.collect()
torch.npu.empty_cache()
torch.npu.reset_peak_memory_stats()

View File

@@ -1,3 +1,5 @@
import gc
import torch
from vllm_ascend.utils import enable_custom_op
@@ -18,7 +20,7 @@ def bgmv_shrink_cpu_impl(x: torch.Tensor, w: torch.Tensor,
@torch.inference_mode()
def test_bgmv_shrink() -> None:
def test_bgmv_shrink():
B = 1
x = torch.randn([B, 128], dtype=torch.float16)
w = torch.randn([64, 16, 128], dtype=torch.float16)
@@ -38,3 +40,6 @@ def test_bgmv_shrink() -> None:
y,
atol=DEFAULT_ATOL,
rtol=DEFAULT_RTOL)
gc.collect()
torch.npu.empty_cache()
torch.npu.reset_peak_memory_stats()

View File

@@ -20,6 +20,7 @@
Run `pytest tests/ops/test_fused_moe.py`.
"""
import gc
from unittest.mock import MagicMock, patch
import pytest
@@ -173,7 +174,9 @@ def test_token_dispatcher_with_all_gather(
torch_output,
atol=4e-2,
rtol=1)
gc.collect()
torch.npu.empty_cache()
torch.npu.reset_peak_memory_stats()
@pytest.mark.parametrize("m", [1, 33, 64])
@@ -247,6 +250,10 @@ def test_select_experts(
assert topk_ids.dtype == torch.int32
assert row_idx.shape == (m, topk)
gc.collect()
torch.npu.empty_cache()
torch.npu.reset_peak_memory_stats()
@pytest.mark.parametrize("device", DEVICE)
def test_select_experts_invalid_scoring_func(device: str):
@@ -258,6 +265,9 @@ def test_select_experts_invalid_scoring_func(device: str):
use_grouped_topk=False,
renormalize=False,
scoring_func="invalid")
gc.collect()
torch.npu.empty_cache()
torch.npu.reset_peak_memory_stats()
@pytest.mark.parametrize("device", DEVICE)
@@ -269,3 +279,6 @@ def test_select_experts_missing_group_params(device: str):
use_grouped_topk=True,
renormalize=False,
scoring_func="softmax")
gc.collect()
torch.npu.empty_cache()
torch.npu.reset_peak_memory_stats()

View File

@@ -14,6 +14,7 @@
# limitations under the License.
# This file is a part of the vllm-ascend project.
import gc
from types import SimpleNamespace
import pytest
@@ -169,3 +170,6 @@ def test_all_gather_comm_impl(
all_gather_hidden_states_out,
atol=atol,
rtol=rtol), "Final hidden states do not match."
gc.collect()
torch.npu.empty_cache()
torch.npu.reset_peak_memory_stats()

View File

@@ -4,6 +4,7 @@
# Adapted from
# https://github.com/vllm-project/vllm/blob/main/vllm/tests/kernels/test_rotary_embedding.py
import gc
from typing import Optional, Tuple, Union
import pytest
@@ -199,6 +200,9 @@ def test_rotary_embedding_quant_with_leading_dim(
ref_key,
atol=DEFAULT_ATOL,
rtol=DEFAULT_RTOL)
gc.collect()
torch.npu.empty_cache()
torch.npu.reset_peak_memory_stats()
class ModelwithRotaryEmbedding(nn.Module):
@@ -342,3 +346,6 @@ def test_capture_rotary_embedding_in_aclgraph(
output_reference,
atol=DEFAULT_ATOL,
rtol=DEFAULT_RTOL)
gc.collect()
torch.npu.empty_cache()
torch.npu.reset_peak_memory_stats()

View File

@@ -1,3 +1,4 @@
import gc
from typing import Tuple
import pytest
@@ -92,3 +93,6 @@ def test_get_masked_input_and_mask(
rtol=1e-5,
atol=1e-5,
msg=f"Mask mismatch for case: {test_case}")
gc.collect()
torch.npu.empty_cache()
torch.npu.reset_peak_memory_stats()

View File

@@ -1,617 +0,0 @@
# SPDX-License-Identifier: Apache-2.0
from typing import Any, Optional
import pytest
import torch
import torch.nn.functional as F
from vllm.v1.sample.logits_processor import LogitsProcessors
from vllm.v1.sample.metadata import SamplingMetadata
from vllm.v1.spec_decode.metadata import SpecDecodeMetadata
from vllm_ascend.sample.rejection_sampler import (PLACEHOLDER_TOKEN_ID,
AscendRejectionSampler)
DEVICE = "npu"
@pytest.fixture
def rejection_sampler():
return AscendRejectionSampler()
def create_logits_tensor(output_token_ids: list[list[int]],
vocab_size: int = 100) -> torch.Tensor:
"""Helper function to create logits tensor that
will produce desired token ids on argmax"""
token_ids = [tokens[:-1] for tokens in output_token_ids]
num_total_tokens = sum(len(tokens) for tokens in token_ids)
logits = torch.full((num_total_tokens, vocab_size), -100.0, device=DEVICE)
start_loc = 0
for tokens in token_ids:
for j, token_id in enumerate(tokens):
logits[start_loc + j, token_id] = 100.0
start_loc += len(tokens)
return logits
def create_sampling_metadata(
all_greedy: bool,
temperature: Optional[torch.Tensor] = None,
top_k: Optional[torch.Tensor] = None,
top_p: Optional[torch.Tensor] = None,
generators: Optional[dict[int, Any]] = None,
) -> SamplingMetadata:
"""Create a v1 sampling metadata object with all_greedy set
to the given value. Either all greedy or all random sampling
is used.
"""
generators = generators or {}
if all_greedy:
temperature = None
else:
assert temperature is not None
return SamplingMetadata(temperature=temperature,
all_greedy=all_greedy,
all_random=not all_greedy,
top_p=top_p,
top_k=top_k,
generators=generators,
max_num_logprobs=0,
no_penalties=False,
prompt_token_ids=None,
frequency_penalties=torch.tensor([]),
presence_penalties=torch.tensor([]),
repetition_penalties=torch.tensor([]),
output_token_ids=[],
allowed_token_ids_mask=None,
bad_words_token_ids={},
logitsprocs=LogitsProcessors())
########################### Tests for Greedy Sampling ###################
def test_perfect_match(rejection_sampler):
"""Test when output tokens perfectly match speculated tokens"""
spec_tokens = [[1, 2, 3]]
output_tokens = [[1, 2, 3, 4]] # 4 is the bonus token
metadata = create_sampling_metadata(all_greedy=True)
logits = create_logits_tensor(output_tokens)
bonus_token_tensor = torch.tensor([[output_tokens[0][-1]]],
device=logits.device,
dtype=torch.int32)
spec_decode_metadata = SpecDecodeMetadata.make_dummy(spec_tokens,
device=logits.device)
output = rejection_sampler(
spec_decode_metadata,
draft_probs=None,
target_logits=logits,
bonus_token_ids=bonus_token_tensor,
sampling_metadata=metadata,
)
expected = torch.tensor([[1, 2, 3, 4]],
dtype=torch.int,
device=logits.device)
assert torch.equal(output, expected)
def test_early_mismatch(rejection_sampler):
"""Test when there's an early mismatch in tokens"""
spec_tokens = [[1, 2, 3]]
output_tokens = [[1, 5, 3, 4]] # Mismatch at position 1
metadata = create_sampling_metadata(all_greedy=True)
logits = create_logits_tensor(output_tokens)
bonus_token_tensor = torch.tensor([[output_tokens[0][-1]]],
device=logits.device,
dtype=torch.int32)
spec_decode_metadata = SpecDecodeMetadata.make_dummy(spec_tokens,
device=logits.device)
output = rejection_sampler(
spec_decode_metadata,
draft_probs=None,
target_logits=logits,
bonus_token_ids=bonus_token_tensor,
sampling_metadata=metadata,
)
expected = torch.tensor(
[[1, 5, PLACEHOLDER_TOKEN_ID, PLACEHOLDER_TOKEN_ID]],
dtype=torch.int,
device=logits.device,
)
assert torch.equal(output, expected)
def test_multiple_sequences(rejection_sampler):
"""Test handling multiple sequences of speculated tokens"""
spec_tokens = [[1, 2], [3]]
output_tokens = [[1, 2, 5], [3,
4]] # Two sequences with bonus tokens 5 and 4
metadata = create_sampling_metadata(all_greedy=True)
logits = create_logits_tensor(output_tokens)
bonus_token_tensor = torch.tensor(
[output_tokens[0][-1], output_tokens[1][-1]],
device=logits.device,
dtype=torch.int32).unsqueeze(1)
spec_decode_metadata = SpecDecodeMetadata.make_dummy(spec_tokens,
device=logits.device)
output = rejection_sampler(
spec_decode_metadata,
draft_probs=None,
target_logits=logits,
bonus_token_ids=bonus_token_tensor,
sampling_metadata=metadata,
)
expected = torch.tensor([[1, 2, 5], [3, 4, PLACEHOLDER_TOKEN_ID]],
dtype=torch.int,
device=logits.device)
assert torch.equal(output, expected)
def test_single_token_sequence(rejection_sampler):
"""Test handling sequences with single token"""
spec_tokens = [[1]]
output_tokens = [[1, 2]] # Single token with bonus token 2
metadata = create_sampling_metadata(all_greedy=True)
logits = create_logits_tensor(output_tokens)
bonus_token_tensor = torch.tensor([[output_tokens[0][-1]]],
device=logits.device,
dtype=torch.int32)
spec_decode_metadata = SpecDecodeMetadata.make_dummy(spec_tokens,
device=logits.device)
output = rejection_sampler(
spec_decode_metadata,
draft_probs=None,
target_logits=logits,
bonus_token_ids=bonus_token_tensor,
sampling_metadata=metadata,
)
expected = torch.tensor([[1, 2]], dtype=torch.int, device=logits.device)
assert torch.equal(output, expected)
def test_empty_sequence(rejection_sampler):
"""Test handling empty sequence of speculated tokens"""
spec_tokens: list[list[int]] = [[]]
output_tokens = [[5]] # Just the bonus token
metadata = create_sampling_metadata(all_greedy=True)
logits = create_logits_tensor(output_tokens)
bonus_token_tensor = torch.tensor([[output_tokens[0][-1]]],
device=logits.device,
dtype=torch.int32)
spec_decode_metadata = SpecDecodeMetadata.make_dummy(spec_tokens,
device=logits.device)
output = rejection_sampler(
spec_decode_metadata,
draft_probs=None,
target_logits=logits,
bonus_token_ids=bonus_token_tensor,
sampling_metadata=metadata,
)
expected = torch.tensor([[5]], dtype=torch.int, device=logits.device)
assert torch.equal(output, expected)
def test_multiple_mismatches(rejection_sampler):
"""Test handling multiple sequences with mismatches"""
spec_tokens = [[1, 2, 3], [4, 5, 6]]
output_tokens = [[1, 2, 7, 6], [4, 8, 6,
9]] # Mismatches in both sequences
metadata = create_sampling_metadata(all_greedy=True)
logits = create_logits_tensor(output_tokens)
bonus_token_tensor = torch.tensor(
[output_tokens[0][-1], output_tokens[1][-1]],
device=logits.device,
dtype=torch.int32).unsqueeze(1)
spec_decode_metadata = SpecDecodeMetadata.make_dummy(spec_tokens,
device=logits.device)
output = rejection_sampler(
spec_decode_metadata,
draft_probs=None,
target_logits=logits,
bonus_token_ids=bonus_token_tensor,
sampling_metadata=metadata,
)
expected = torch.tensor(
[[1, 2, 7, PLACEHOLDER_TOKEN_ID],
[4, 8, PLACEHOLDER_TOKEN_ID, PLACEHOLDER_TOKEN_ID]],
dtype=torch.int,
device=logits.device,
)
assert torch.equal(output, expected)
@pytest.mark.parametrize(
"spec_tokens,output_tokens,expected",
[
([[1, 2]], [[1, 2, 3]], [[1, 2, 3]]), # Perfect match with bonus
([[1]], [[2, 3]], [[2, PLACEHOLDER_TOKEN_ID]]), # First mismatch
([[1, 2], [3, 4]], [[1, 5, 6], [3, 4, 7]],
[[1, 5, PLACEHOLDER_TOKEN_ID], [3, 4, 7]]), # Mixed matches
])
def test_parametrized_cases(rejection_sampler, spec_tokens, output_tokens,
expected):
"""Parametrized test for various matching scenarios"""
metadata = create_sampling_metadata(all_greedy=True)
logits = create_logits_tensor(output_tokens)
bonus_token_tensor = torch.tensor([tokens[-1] for tokens in output_tokens],
device=logits.device,
dtype=torch.int32).unsqueeze(1)
spec_decode_metadata = SpecDecodeMetadata.make_dummy(spec_tokens,
device=logits.device)
output = rejection_sampler(
spec_decode_metadata,
draft_probs=None,
target_logits=logits,
bonus_token_ids=bonus_token_tensor,
sampling_metadata=metadata,
)
expected_tensor = torch.tensor(expected,
dtype=torch.int,
device=logits.device)
assert torch.equal(output, expected_tensor)
########################### Tests for Random Sampling ###################
@pytest.mark.parametrize("k", [1, 3, 5])
@pytest.mark.parametrize("vocab_size", [1000])
@pytest.mark.parametrize("batch_size", [1, 4, 8])
@pytest.mark.parametrize("frac_seeded", [0.0, 0.5])
@pytest.mark.parametrize("n_rep", [20])
def test_deterministic_when_seeded(
rejection_sampler,
k: int,
vocab_size: int,
batch_size: int,
frac_seeded: float,
n_rep: int,
):
num_tokens = batch_size * k
draft_probs = torch.rand(num_tokens,
vocab_size,
dtype=torch.float32,
device=DEVICE)
draft_probs = F.softmax(draft_probs, dim=-1)
target_logits = torch.rand_like(draft_probs)
bonus_token_ids = torch.randint(low=0,
high=vocab_size,
size=(batch_size, 1),
dtype=torch.int64,
device=DEVICE)
draft_token_ids = torch.randint(low=0,
high=vocab_size,
size=(batch_size, k),
dtype=torch.int64,
device=DEVICE)
seeded_mask = torch.rand(batch_size, dtype=torch.float32) <= frac_seeded
results = []
for _ in range(n_rep):
seeded_seqs = {
i: torch.Generator(device=DEVICE).manual_seed(i)
for i in range(batch_size) if seeded_mask[i]
}
temperature = torch.ones(batch_size,
dtype=torch.float32,
device=DEVICE)
sampling_metadata = create_sampling_metadata(all_greedy=False,
temperature=temperature,
generators=seeded_seqs)
spec_decode_metadata = SpecDecodeMetadata.make_dummy(
draft_token_ids.tolist(), device=DEVICE)
rep_result = rejection_sampler(
spec_decode_metadata,
draft_probs=draft_probs,
target_logits=target_logits,
bonus_token_ids=bonus_token_ids,
sampling_metadata=sampling_metadata,
)
results.append(rep_result)
for i in range(batch_size):
if seeded_mask[i]:
for j in range(1, n_rep):
assert torch.equal(results[j][i], results[0][i])
@pytest.mark.skipif(True, reason="Test failed, need fix")
def test_rejection_sampling_approximates_target_distribution():
"""Verify rejection sampling approximates target distribution,
despite sampling from a potentially distinct draft distribution.
This is done by first creating a random target probability
distribution and a random draft probability distribution. We then
sample token ids from the rejection sampler using these draft
and target distributions. The samples are used to estimate
the output probability distribution, which we expect to approximate
the target distribution.
A basic distance metric is used to determine similarity between
distributions.
We expect that as we increase the number of samples,
the distance between the observed distribution and the target
distribution decreases. To measure this, we compare the distance
of the observed distribution against both the target distribution
and a uniform random distribution. We expect the distance between
the observed distribution and the target distribution to improve
much more than the distance improvement between the observed
distribution and the random distribution.
"""
torch.set_default_device(DEVICE)
vocab_size = 10
k = 2
num_reference_probs = 100
# Prepare draft, target, and reference probability distributions
draft_probs = F.softmax(torch.rand(vocab_size, dtype=torch.float32),
dim=-1)
target_logits = torch.rand(vocab_size, dtype=torch.float32)
target_probs = F.softmax(target_logits, dim=-1)
reference_probs = F.softmax(
torch.rand(num_reference_probs, vocab_size, dtype=torch.float32),
dim=-1,
)
sample_sizes = [10, 100, 1_000, 10_000, 100_000]
distance_wrt_reference: list[float] = []
distance_wrt_target: list[float] = []
for num_samples in sample_sizes:
# Sample using rejection sampling.
rej_sample_probs = estimate_rejection_sampling_pdf(
draft_probs, target_logits, k, vocab_size, num_samples)
rej_sample_probs = rej_sample_probs.to(DEVICE)
# Average distance from reference probs.
reference_vs_rejsample_dist = torch.dist(
reference_probs,
rej_sample_probs).item() / reference_probs.shape[0]
target_vs_rejsample_dist = torch.dist(target_probs,
rej_sample_probs).item()
distance_wrt_reference.append(reference_vs_rejsample_dist)
distance_wrt_target.append(target_vs_rejsample_dist)
relative_change_in_distance_wrt_target = get_ratio_first_to_last(
distance_wrt_target)
relative_change_in_distance_wrt_reference = get_ratio_first_to_last(
distance_wrt_reference)
print(f"{num_samples=} {target_vs_rejsample_dist=:.05f} "
f"{reference_vs_rejsample_dist=:.05f}")
print(f"{num_samples=} {relative_change_in_distance_wrt_target=:.02f} "
f"{relative_change_in_distance_wrt_reference=:.02f}")
relative_change_in_distance_wrt_target = get_ratio_first_to_last(
distance_wrt_target)
relative_change_in_distance_wrt_reference = get_ratio_first_to_last(
distance_wrt_reference)
expected_improvement_multiplier = 20
assert (relative_change_in_distance_wrt_target
> relative_change_in_distance_wrt_reference *
expected_improvement_multiplier)
def get_ratio_first_to_last(elements: list[float]) -> float:
return elements[0] / elements[-1]
def estimate_rejection_sampling_pdf(
draft_probs: torch.Tensor,
target_logits: torch.Tensor,
k: int,
vocab_size: int,
num_samples: int,
) -> torch.Tensor:
"""Estimate the probability distribution of the output tokens
using rejection sampling.
Args:
draft_probs: Draft probability distribution.
target_logits: Target logits.
num_samples: Number of samples to draw.
Returns:
Estimated probability distribution of the output tokens.
"""
rejection_sampler = AscendRejectionSampler()
num_tokens = num_samples * k
# Repeat draft probs num_samples * k times.
draft_probs = draft_probs.reshape(1, 1,
vocab_size).repeat(num_samples, k, 1)
# Repeat target probs num_tokens times.
target_logits = target_logits.reshape(1, vocab_size).repeat(num_tokens, 1)
# Randomly sample draft token ids from draft probs.
draft_token_ids = torch.multinomial(draft_probs[:, 0, :],
num_samples=k,
replacement=True).reshape(
num_samples, k)
draft_probs = draft_probs.view(num_tokens, vocab_size)
# Bonus tokens not used but required.
bonus_token_ids = torch.zeros((1, 1), dtype=torch.int64,
device=DEVICE).repeat(num_samples, 1)
temperature = torch.ones(num_samples, dtype=torch.float32, device=DEVICE)
sampling_metadata = create_sampling_metadata(all_greedy=False,
temperature=temperature)
spec_decode_metadata = SpecDecodeMetadata.make_dummy(
draft_token_ids.tolist(), device=bonus_token_ids.device)
output_token_ids = rejection_sampler(
spec_decode_metadata,
draft_probs=draft_probs,
target_logits=target_logits,
bonus_token_ids=bonus_token_ids,
sampling_metadata=sampling_metadata,
)
output_token_ids = output_token_ids[:, :-1].flatten()
hist = torch.histogram(output_token_ids.to(dtype=torch.float,
device="cpu"),
bins=vocab_size,
range=(0, vocab_size),
density=True)
return hist.hist
def _test_masked_logits(
rejection_sampler,
batch_size: int,
num_draft_tokens: int,
vocab_size: int,
target_logits: torch.Tensor,
unmasked_indices: torch.Tensor,
sampling_metadata: SamplingMetadata,
):
# Set up test parameters
num_tokens = batch_size * num_draft_tokens
# Create random draft probabilities.
draft_probs = torch.rand((num_tokens, vocab_size),
dtype=torch.float32,
device=DEVICE)
draft_probs = F.softmax(draft_probs, dim=-1)
# Randomly sample draft token ids from draft probs
draft_token_ids = torch.multinomial(draft_probs, num_samples=1)
draft_token_ids = draft_token_ids.reshape(batch_size, num_draft_tokens)
draft_token_ids = draft_token_ids.tolist()
# Bonus tokens not used but required
bonus_token_ids = torch.zeros((batch_size, 1),
dtype=torch.int64,
device=DEVICE)
# Create spec decode metadata
spec_decode_metadata = SpecDecodeMetadata.make_dummy(
draft_token_ids,
device=DEVICE,
)
# Run rejection sampling
output_token_ids = rejection_sampler(
spec_decode_metadata,
draft_probs=draft_probs,
target_logits=target_logits,
bonus_token_ids=bonus_token_ids,
sampling_metadata=sampling_metadata,
)
# Remove bonus tokens and reshape
output_token_ids = output_token_ids[:, :-1].flatten().tolist()
# Check that all sampled tokens are within the unmasked indices.
for i in range(num_tokens):
token_id = output_token_ids[i]
if token_id == PLACEHOLDER_TOKEN_ID:
continue
assert token_id in unmasked_indices[i]
@pytest.mark.parametrize("top_k", [1, 5, 99])
def test_top_k(rejection_sampler, top_k):
"""Test rejection sampling with top-k sampling"""
vocab_size = 100
batch_size = 100
num_draft_tokens = 3
num_tokens = batch_size * num_draft_tokens
# Randomly create top-k indices.
top_k_indices = [
torch.randperm(vocab_size, device=DEVICE)[:top_k]
for _ in range(num_tokens)
]
top_k_indices = torch.stack(top_k_indices)
# Create logits with the uniform distribution.
target_logits = torch.zeros((num_tokens, vocab_size), device=DEVICE)
# Increment the logits for top-k indices, a little bit more than the other
# ones. If the masking is effective, the non-topk indices will never be
# sampled despite the small difference in logits.
for i in range(num_tokens):
target_logits[i, top_k_indices[i]] += 0.1
# Create sampling metadata
temperature = torch.ones(batch_size, dtype=torch.float32, device=DEVICE)
sampling_metadata = create_sampling_metadata(
all_greedy=False,
temperature=temperature,
top_k=torch.tensor([top_k] * batch_size,
device=DEVICE,
dtype=torch.int64),
)
_test_masked_logits(
rejection_sampler,
batch_size=batch_size,
num_draft_tokens=num_draft_tokens,
vocab_size=vocab_size,
target_logits=target_logits,
unmasked_indices=top_k_indices,
sampling_metadata=sampling_metadata,
)
@pytest.mark.parametrize("top_p", [0.5, 0.9, 0.99])
def test_top_p(rejection_sampler, top_p):
"""Test rejection sampling with top-p sampling"""
vocab_size = 100
batch_size = 100
num_draft_tokens = 3
num_tokens = batch_size * num_draft_tokens
# Create logits with the uniform distribution.
target_logits = torch.randn((num_tokens, vocab_size), device=DEVICE)
temperature = torch.ones(batch_size, dtype=torch.float32, device=DEVICE)
rescaled_logits = target_logits / temperature
logits_sort, logits_idx = rescaled_logits.sort(dim=-1, descending=False)
probs_sort = logits_sort.softmax(dim=-1)
probs_sum = probs_sort.cumsum(dim=-1)
top_p_mask = probs_sum <= 1 - top_p
# at least one
top_p_mask[:, -1] = False
# Get the top-p indices.
top_p_indices = []
for i in range(num_tokens):
top_p_indices.append(logits_idx[i][~top_p_mask[i]].tolist())
# Create sampling metadata
sampling_metadata = create_sampling_metadata(
all_greedy=False,
temperature=temperature,
top_p=torch.tensor([top_p] * batch_size,
device=DEVICE,
dtype=torch.float32),
)
_test_masked_logits(
rejection_sampler,
batch_size=batch_size,
num_draft_tokens=num_draft_tokens,
vocab_size=vocab_size,
target_logits=target_logits,
unmasked_indices=top_p_indices,
sampling_metadata=sampling_metadata,
)

View File

@@ -7,6 +7,8 @@ from typing import Any
import pytest
from vllm import LLM, SamplingParams
from tests.e2e.conftest import VllmRunner
@pytest.fixture
def test_prompts():
@@ -72,19 +74,16 @@ def test_ngram_correctness(
ref_llm = LLM(model=model_name, max_model_len=1024, enforce_eager=True)
ref_outputs = ref_llm.chat(test_prompts, sampling_config)
del ref_llm
spec_llm = LLM(
model=model_name,
speculative_config={
"method": "ngram",
"prompt_lookup_max": 5,
"prompt_lookup_min": 3,
"num_speculative_tokens": 3,
},
max_model_len=1024,
enforce_eager=True,
)
spec_outputs = spec_llm.chat(test_prompts, sampling_config)
with VllmRunner(model_name,
speculative_config={
"method": "ngram",
"prompt_lookup_max": 5,
"prompt_lookup_min": 3,
"num_speculative_tokens": 3,
},
max_model_len=1024,
enforce_eager=True) as runner:
spec_outputs = runner.model.chat(test_prompts, sampling_config)
matches = 0
misses = 0
for ref_output, spec_output in zip(ref_outputs, spec_outputs):
@@ -98,7 +97,6 @@ def test_ngram_correctness(
# Heuristic: expect at least 70% of the prompts to match exactly
# Upon failure, inspect the outputs to check for inaccuracy.
assert matches > int(0.7 * len(ref_outputs))
del spec_llm
@pytest.mark.skipif(True, reason="oom in CI, fix me")
@@ -121,23 +119,24 @@ def test_eagle_correctness(
del ref_llm
spec_model_name = eagle3_model_name() if use_eagle3 else eagle_model_name()
spec_llm = LLM(
model=model_name,
trust_remote_code=True,
enable_chunked_prefill=True,
max_num_seqs=1,
max_num_batched_tokens=2048,
gpu_memory_utilization=0.6,
speculative_config={
"method": "eagle3" if use_eagle3 else "eagle",
"model": spec_model_name,
"num_speculative_tokens": 2,
"max_model_len": 128,
},
max_model_len=128,
enforce_eager=True,
)
spec_outputs = spec_llm.chat(test_prompts, sampling_config)
with VllmRunner(
model_name,
trust_remote_code=True,
enable_chunked_prefill=True,
max_num_seqs=1,
max_num_batched_tokens=2048,
gpu_memory_utilization=0.6,
speculative_config={
"method": "eagle3" if use_eagle3 else "eagle",
"model": spec_model_name,
"num_speculative_tokens": 2,
"max_model_len": 128,
},
max_model_len=128,
enforce_eager=True,
) as runner:
spec_outputs = runner.model.chat(test_prompts, sampling_config)
matches = 0
misses = 0
for ref_output, spec_output in zip(ref_outputs, spec_outputs):
@@ -151,4 +150,3 @@ def test_eagle_correctness(
# Heuristic: expect at least 66% of the prompts to match exactly
# Upon failure, inspect the outputs to check for inaccuracy.
assert matches > int(0.66 * len(ref_outputs))
del spec_llm

View File

@@ -21,16 +21,13 @@ Run `pytest tests/compile/test_aclgraph.py`.
"""
import pytest
import torch
from vllm import LLM, SamplingParams
from vllm import SamplingParams
from tests.e2e.conftest import VllmRunner
from tests.e2e.model_utils import check_outputs_equal
MODELS = [
"Qwen/Qwen2.5-0.5B-Instruct",
# TODO: REVERT ME when oom is fixed
# "vllm-ascend/Qwen3-30B-A3B-Puring"
"Qwen/Qwen3-0.6B",
]
@@ -46,17 +43,19 @@ def test_models_with_aclgraph(
]
sampling_params = SamplingParams(max_tokens=max_tokens, temperature=0.0)
# TODO: change to use vllmrunner when the registry of custom op is solved
# while running pytest
vllm_model = LLM(model, max_model_len=1024)
vllm_aclgraph_outputs = vllm_model.generate(prompts, sampling_params)
del vllm_model
torch.npu.empty_cache()
with VllmRunner(
model,
max_model_len=1024,
enforce_eager=False,
) as runner:
vllm_aclgraph_outputs = runner.model.generate(prompts, sampling_params)
vllm_model = LLM(model, enforce_eager=True, max_model_len=1024)
vllm_eager_outputs = vllm_model.generate(prompts, sampling_params)
del vllm_model
torch.npu.empty_cache()
with VllmRunner(
model,
max_model_len=1024,
enforce_eager=True,
) as runner:
vllm_eager_outputs = runner.model.generate(prompts, sampling_params)
vllm_aclgraph_outputs_list = []
for output in vllm_aclgraph_outputs:
@@ -74,21 +73,3 @@ def test_models_with_aclgraph(
name_0="vllm_eager_outputs",
name_1="vllm_aclgraph_outputs",
)
def test_deepseek_raises_error(monkeypatch: pytest.MonkeyPatch) -> None:
with monkeypatch.context() as m:
m.setenv("VLLM_USE_MODELSCOPE", "True")
with pytest.raises(NotImplementedError) as excinfo:
VllmRunner("deepseek-ai/DeepSeek-V2-Lite-Chat",
max_model_len=1024,
enforce_eager=False)
assert "ACL Graph does not support deepseek" in str(excinfo.value)
@pytest.mark.parametrize("model", MODELS)
def test_ray_backend_sets_no_compilation(model: str) -> None:
runner = VllmRunner(model,
enforce_eager=False,
distributed_executor_backend="ray")
assert runner.model.llm_engine.vllm_config.compilation_config.level == 0

View File

@@ -4,6 +4,7 @@ import pytest
from tests.e2e.conftest import VllmRunner
from tests.e2e.model_utils import check_outputs_equal
from vllm_ascend.ascend_config import clear_ascend_config
MODEL = "Qwen/Qwen3-0.6B"
@@ -26,6 +27,8 @@ def test_concurrent_partial_prefill():
for output in outputs:
assert len(output.outputs) == 1
clear_ascend_config()
def test_prefix_cache_stats_is_recorded():
with VllmRunner(MODEL,
@@ -45,13 +48,17 @@ def test_prefix_cache_stats_is_recorded():
outputs = vllm_model.model.generate([input_tokens])
assert outputs[0].num_cached_tokens == 128
clear_ascend_config()
@pytest.mark.parametrize("max_tokens",
[4]) # cannot align results when max_tokens > 4
@pytest.mark.parametrize("chunked_prefill_token_size", [16])
def test_chunked_prefill_with_ascend_scheduler(
example_prompts, max_tokens: int,
chunked_prefill_token_size: int) -> None:
max_tokens: int, chunked_prefill_token_size: int) -> None:
example_prompts = [
"vLLM is a high-throughput and memory-efficient inference and serving engine for LLMs."
]
max_num_seqs = chunked_prefill_token_size
max_num_batched_tokens = chunked_prefill_token_size
with VllmRunner(MODEL,
@@ -63,7 +70,6 @@ def test_chunked_prefill_with_ascend_scheduler(
},
max_num_seqs=max_num_seqs,
max_num_batched_tokens=max_num_batched_tokens,
enforce_eager=True,
max_model_len=2048,
gpu_memory_utilization=0.7) as vllm_model:
chunked_prefill_output = vllm_model.generate_greedy(
@@ -75,7 +81,6 @@ def test_chunked_prefill_with_ascend_scheduler(
'enabled': True,
},
},
enforce_eager=True,
max_model_len=2048,
gpu_memory_utilization=0.7) as vllm_model:
vllm_output = vllm_model.generate_greedy(example_prompts, max_tokens)
@@ -86,3 +91,4 @@ def test_chunked_prefill_with_ascend_scheduler(
name_0="vllm_output",
name_1="chunked_prefill_output",
)
clear_ascend_config()

View File

@@ -17,10 +17,13 @@
# limitations under the License.
#
import gc
import torch
from vllm import LLM, SamplingParams
from vllm import SamplingParams
from vllm.utils import GiB_bytes
from tests.e2e.conftest import VllmRunner
from tests.e2e.utils import fork_new_process_for_each_test
from vllm_ascend.device_allocator.camem import CaMemAllocator
@@ -57,29 +60,37 @@ def test_basic_camem():
output = x + y + z
assert torch.allclose(output, torch.ones_like(output) * 3)
gc.collect()
torch.npu.empty_cache()
torch.npu.reset_peak_memory_stats()
@fork_new_process_for_each_test
def test_end_to_end():
free, total = torch.npu.mem_get_info()
used_bytes_baseline = total - free # in case other process is running
llm = LLM("Qwen/Qwen2.5-0.5B-Instruct", enable_sleep_mode=True)
prompt = "How are you?"
sampling_params = SamplingParams(temperature=0, max_tokens=10)
output = llm.generate(prompt, sampling_params)
# the benefit of `llm.sleep(level=2)` is mainly CPU memory usage,
# which is difficult to measure in the test. therefore, we only
# test sleep level 1 here.
llm.sleep(level=1)
with VllmRunner("Qwen/Qwen3-0.6B",
enforce_eager=True,
enable_sleep_mode=True) as runner:
free_gpu_bytes_after_sleep, total = torch.npu.mem_get_info()
used_bytes = total - free_gpu_bytes_after_sleep - used_bytes_baseline
# now the memory usage should be less than the model weights
# (0.5B model, 1GiB weights)
assert used_bytes < 1 * GiB_bytes
output = runner.model.generate(prompt, sampling_params)
# the benefit of `llm.sleep(level=2)` is mainly CPU memory usage,
# which is difficult to measure in the test. therefore, we only
# test sleep level 1 here.
runner.model.sleep(level=1)
llm.wake_up()
output2 = llm.generate(prompt, sampling_params)
free_gpu_bytes_after_sleep, total = torch.npu.mem_get_info()
used_bytes = total - free_gpu_bytes_after_sleep - used_bytes_baseline
# now the memory usage should be less than the model weights
# (0.5B model, 1GiB weights)
assert used_bytes < 1 * GiB_bytes
runner.model.wake_up()
output2 = runner.model.generate(prompt, sampling_params)
# cmp output
assert output[0].outputs[0].text == output2[0].outputs[0].text

View File

@@ -19,6 +19,8 @@ Compare the outputs of vLLM with and without aclgraph.
Run `pytest tests/compile/test_aclgraph.py`.
"""
import gc
import pytest
import torch
from vllm import SamplingParams
@@ -73,3 +75,7 @@ def test_models(
print(f"Token IDs cosine similarity: {similarity.item()}")
assert similarity > 0.95
gc.collect()
torch.npu.empty_cache()
torch.npu.reset_peak_memory_stats()

View File

@@ -16,24 +16,29 @@
# This file is a part of the vllm-ascend project.
# Adapted from vllm/tests/basic_correctness/test_basic_correctness.py
#
from collections.abc import Sequence
from typing import Optional
from modelscope import snapshot_download # type: ignore[import-untyped]
from tests.e2e.conftest import HfRunner
from tests.e2e.utils import check_embeddings_close, matryoshka_fy
from tests.e2e.conftest import HfRunner, VllmRunner
from tests.e2e.utils import check_embeddings_close
def run_embedding_correctness_test(
hf_model: "HfRunner",
inputs: list[str],
vllm_outputs: Sequence[list[float]],
dimensions: Optional[int] = None,
):
hf_outputs = hf_model.encode(inputs)
if dimensions:
hf_outputs = matryoshka_fy(hf_outputs, dimensions)
def test_embed_models_correctness():
queries = ['What is the capital of China?', 'Explain gravity']
model_name = snapshot_download("Qwen/Qwen3-Embedding-0.6B")
with VllmRunner(
model_name,
task="embed",
enforce_eager=True,
) as vllm_runner:
vllm_outputs = vllm_runner.encode(queries)
with HfRunner(
model_name,
dtype="float32",
is_sentence_transformer=True,
) as hf_runner:
hf_outputs = hf_runner.encode(queries)
check_embeddings_close(
embeddings_0_lst=hf_outputs,
@@ -42,27 +47,3 @@ def run_embedding_correctness_test(
name_1="vllm",
tol=1e-2,
)
# dummy to avoid pytest collect nothing and exit code 5
def test_dummy():
assert True
def test_embed_models_correctness(hf_runner, vllm_runner):
queries = ['What is the capital of China?', 'Explain gravity']
model_name = snapshot_download("Qwen/Qwen3-Embedding-0.6B")
with vllm_runner(
model_name,
task="embed",
enforce_eager=True,
) as vllm_model:
vllm_outputs = vllm_model.encode(queries)
with hf_runner(
model_name,
dtype="float32",
is_sentence_transformer=True,
) as hf_model:
run_embedding_correctness_test(hf_model, queries, vllm_outputs)

View File

@@ -28,7 +28,7 @@ from vllm.sampling_params import GuidedDecodingParams, SamplingParams
from tests.e2e.conftest import VllmRunner
os.environ["PYTORCH_NPU_ALLOC_CONF"] = "max_split_size_mb:256"
MODEL_NAME = "Qwen/Qwen2.5-0.5B-Instruct"
MODEL_NAME = "Qwen/Qwen3-0.6B"
GuidedDecodingBackend = ["xgrammar", "guidance", "outlines"]
@@ -92,7 +92,6 @@ def test_guided_json_completion(guided_decoding_backend: str,
with VllmRunner(
MODEL_NAME,
seed=0,
dtype="auto",
guided_decoding_backend=guided_decoding_backend,
) as vllm_model:
prompts = [
@@ -131,7 +130,6 @@ def test_guided_regex(guided_decoding_backend: str, sample_regex):
with VllmRunner(
MODEL_NAME,
seed=0,
dtype="auto",
guided_decoding_backend=guided_decoding_backend,
) as vllm_model:
prompts = [

View File

@@ -47,9 +47,11 @@ def do_sample(llm: vllm.LLM, lora_path: str, lora_id: int) -> list[str]:
def test_ilama_lora(ilama_lora_files):
with VllmRunner(snapshot_download(MODEL_PATH),
enable_lora=True,
dtype="half",
max_loras=4,
max_model_len=1024,
max_num_seqs=16) as vllm_model:
max_num_seqs=16,
enforce_eager=True) as vllm_model:
output1 = do_sample(vllm_model.model, ilama_lora_files, lora_id=1)
for i in range(len(EXPECTED_LORA_OUTPUT)):

View File

@@ -1,166 +0,0 @@
#
# Copyright (c) 2025 Huawei Technologies Co., Ltd. All Rights Reserved.
# Copyright 2023 The vLLM team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# This file is a part of the vllm-ascend project.
# Adapted from vllm/tests/basic_correctness/test_basic_correctness.py
#
"""Compare the short outputs of HF and vLLM when using greedy sampling.
Run `pytest tests/test_offline_inference.py`.
"""
import os
from unittest.mock import patch
import pytest
import vllm # noqa: F401
from vllm import SamplingParams
from vllm.assets.audio import AudioAsset
from vllm.assets.image import ImageAsset
import vllm_ascend # noqa: F401
from tests.e2e.conftest import VllmRunner
MODELS = [
"Qwen/Qwen2.5-0.5B-Instruct",
"Qwen/Qwen3-0.6B-Base",
]
MULTIMODALITY_VL_MODELS = ["Qwen/Qwen2.5-VL-3B-Instruct"]
MULTIMODALITY_AUDIO_MODELS = ["Qwen/Qwen2-Audio-7B-Instruct"]
os.environ["PYTORCH_NPU_ALLOC_CONF"] = "max_split_size_mb:256"
AUDIO_ASSETS = [AudioAsset("mary_had_lamb"), AudioAsset("winning_call")]
AUDIO_PROMPT_TEMPLATES = {
1: "What is recited in the audio?",
2: "What sport and what nursery rhyme are referenced?"
}
@pytest.mark.parametrize("model", MODELS)
@pytest.mark.parametrize("dtype", ["half", "float16"])
@pytest.mark.parametrize("max_tokens", [5])
def test_models(model: str, dtype: str, max_tokens: int) -> None:
# 5042 tokens for gemma2
# gemma2 has alternating sliding window size of 4096
# we need a prompt with more than 4096 tokens to test the sliding window
prompt = "The following numbers of the sequence " + ", ".join(
str(i) for i in range(1024)) + " are:"
example_prompts = [prompt]
with VllmRunner(model,
max_model_len=8192,
dtype=dtype,
enforce_eager=True,
gpu_memory_utilization=0.7) as vllm_model:
vllm_model.generate_greedy(example_prompts, max_tokens)
@pytest.mark.parametrize("model", MULTIMODALITY_VL_MODELS)
def test_multimodal_vl(model, prompt_template, vllm_runner):
image = ImageAsset("cherry_blossom") \
.pil_image.convert("RGB")
img_questions = [
"What is the content of this image?",
"Describe the content of this image in detail.",
"What's in the image?",
"Where is this image taken?",
]
images = [image] * len(img_questions)
prompts = prompt_template(img_questions)
with vllm_runner(model,
max_model_len=4096,
mm_processor_kwargs={
"min_pixels": 28 * 28,
"max_pixels": 1280 * 28 * 28,
"fps": 1,
}) as vllm_model:
vllm_model.generate_greedy(prompts=prompts,
images=images,
max_tokens=64)
def prepare_audio_inputs(audio_count: int):
audio_prompt = "".join([
f"Audio {idx+1}: <|audio_bos|><|AUDIO|><|audio_eos|>\n"
for idx in range(audio_count)
])
question = AUDIO_PROMPT_TEMPLATES[audio_count]
prompt = ("<|im_start|>system\nYou are a helpful assistant.<|im_end|>\n"
"<|im_start|>user\n"
f"{audio_prompt}{question}<|im_end|>\n"
"<|im_start|>assistant\n")
mm_data = {
"audio":
[asset.audio_and_sample_rate for asset in AUDIO_ASSETS[:audio_count]]
}
inputs = {"prompt": prompt, "multi_modal_data": mm_data}
return inputs
@pytest.mark.parametrize("model", MULTIMODALITY_AUDIO_MODELS)
@pytest.mark.parametrize("audio_count", [2])
@pytest.mark.parametrize("max_tokens", [10])
def test_multimodal_audio(model: str, audio_count: int,
max_tokens: int) -> None:
inputs = prepare_audio_inputs(audio_count)
sampling_params = SamplingParams(temperature=0.2,
max_tokens=max_tokens,
stop_token_ids=None)
with VllmRunner(model,
max_model_len=4096,
max_num_seqs=5,
enforce_eager=False,
dtype="bfloat16",
limit_mm_per_prompt={"audio": audio_count},
gpu_memory_utilization=0.9) as vllm_model:
vllm_model.generate(inputs, sampling_params=sampling_params)
@patch.dict(os.environ, {"VLLM_ASCEND_ENABLE_TOPK_TOPP_OPTIMIZATION": "1"})
def test_models_topk() -> None:
example_prompts = [
"Hello, my name is",
"The president of the United States is",
"The capital of France is",
"The future of AI is",
]
sampling_params = SamplingParams(max_tokens=5,
temperature=0.0,
top_k=50,
top_p=0.9)
with VllmRunner("Qwen/Qwen2.5-0.5B-Instruct",
max_model_len=8192,
dtype="float16",
enforce_eager=True,
gpu_memory_utilization=0.7) as vllm_model:
vllm_model.generate(example_prompts, sampling_params)
def test_models_prompt_logprobs() -> None:
example_prompts = [
"Hello, my name is",
]
with VllmRunner("Qwen/Qwen2.5-0.5B-Instruct",
max_model_len=8192,
dtype="float16",
enforce_eager=True,
gpu_memory_utilization=0.7) as vllm_model:
vllm_model.generate_greedy_logprobs(example_prompts,
max_tokens=5,
num_logprobs=1)

View File

@@ -16,6 +16,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import gc
import os
import time
from unittest.mock import patch
@@ -50,6 +51,10 @@ def test_execue_duration_enabled_discrepancy():
assert diff <= 0.5, (
f"CPU={cpu_duration:.2f}ms, NPU={npu_durations['forward']:.2f}ms")
gc.collect()
torch.npu.empty_cache()
torch.npu.reset_peak_memory_stats()
def test_execue_duration_disabled():
a = torch.randn(100, 100).npu()
@@ -60,3 +65,7 @@ def test_execue_duration_disabled():
torch.npu.synchronize()
npu_durations = ProfileExecuteDuration().pop_captured_sync()
assert not npu_durations
gc.collect()
torch.npu.empty_cache()
torch.npu.reset_peak_memory_stats()

View File

@@ -1,29 +0,0 @@
#
# Copyright (c) 2025 Huawei Technologies Co., Ltd. All Rights Reserved.
# This file is a part of the vllm-ascend project.
# Adapted from vllm/tests/basic_correctness/test_basic_correctness.py
# Copyright 2023 The vLLM team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import torch
from vllm_ascend.distributed.device_communicators.pyhccl_wrapper import \
HCCLLibrary
def test_hcclGetUniqueId():
torch.npu.set_device(0)
lib = HCCLLibrary()
unique_id = lib.hcclGetUniqueId()
assert unique_id is not None

View File

@@ -15,27 +15,20 @@
# limitations under the License.
# This file is a part of the vllm-ascend project.
#
import pytest
from modelscope import snapshot_download # type: ignore[import-untyped]
from tests.e2e.conftest import VllmRunner
MODELS = [
"vllm-ascend/DeepSeek-V2-Lite-W8A8",
"vllm-ascend/Qwen2.5-0.5B-Instruct-W8A8"
]
@pytest.mark.parametrize("model", MODELS)
def test_quant_W8A8(example_prompts, model):
def test_quant_W8A8():
max_tokens = 5
model_path = snapshot_download(model)
example_prompts = [
"vLLM is a high-throughput and memory-efficient inference and serving engine for LLMs."
]
with VllmRunner(
model_path,
snapshot_download("vllm-ascend/Qwen2.5-0.5B-Instruct-W8A8"),
max_model_len=8192,
enforce_eager=True,
dtype="auto",
gpu_memory_utilization=0.7,
quantization="ascend",
) as vllm_model:

View File

@@ -16,94 +16,34 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
from typing import Optional
from vllm import SamplingParams
import torch
# Set tolerance to 1 for quant ops
DEFAULT_ATOL = 1e-3
DEFAULT_RTOL = 1e-3
from tests.e2e.conftest import VllmRunner
def apply_top_k_top_p(
logits: torch.Tensor,
k: Optional[torch.Tensor],
p: Optional[torch.Tensor],
) -> torch.Tensor:
"""Apply top-k and top-p masks to the logits.
def test_models_topk() -> None:
example_prompts = [
"Hello, my name is",
]
sampling_params = SamplingParams(max_tokens=5,
temperature=0.0,
top_k=50,
top_p=0.9)
If a top-p is used, this function will sort the logits tensor,
which can be slow for large batches.
The logits tensor may be updated in-place.
"""
logits_sort, logits_idx = logits.sort(dim=-1, descending=False)
if k is not None:
# Apply top-k.
top_k_mask = logits_sort.size(1) - k.to(torch.long) # shape: B
# Get all the top_k values.
top_k_mask = logits_sort.gather(1, top_k_mask.unsqueeze(dim=1))
top_k_mask = logits_sort < top_k_mask
logits_sort.masked_fill_(top_k_mask, -float("inf"))
if p is not None:
# Apply top-p.
probs_sort = logits_sort.softmax(dim=-1)
probs_sum = torch.cumsum(probs_sort, dim=-1, out=probs_sort)
top_p_mask = probs_sum <= 1 - p.unsqueeze(dim=1)
# at least one
top_p_mask[:, -1] = False
logits_sort.masked_fill_(top_p_mask, -float("inf"))
# Re-sort the probabilities.
logits = logits_sort.scatter(dim=-1, index=logits_idx, src=logits_sort)
return logits
with VllmRunner("Qwen/Qwen3-0.6B",
max_model_len=8192,
gpu_memory_utilization=0.7) as runner:
runner.generate(example_prompts, sampling_params)
def apply_top_k_top_p_new(
logits: torch.Tensor,
k: Optional[torch.Tensor],
p: Optional[torch.Tensor],
) -> torch.Tensor:
batch_size, vocab_size = logits.shape
logits_sort, logits_idx = logits.sort(dim=-1, descending=False)
def test_models_prompt_logprobs() -> None:
example_prompts = [
"Hello, my name is",
]
# Apply top-k.
boundary = logits_sort.gather(1, (vocab_size - k).unsqueeze(dim=1))
top_k_mask = logits_sort < boundary
logits_sort.masked_fill_(top_k_mask, -float("inf"))
if p is not None:
# Apply top-p.
cutoff = top_k_mask.sum(dim=-1).min()
probs_sort = logits_sort.softmax(dim=-1)[:, cutoff:]
probs_sum = probs_sort.cumsum(dim=-1)
top_p_mask = probs_sum > 1 - p.unsqueeze(dim=1)
top_p_mask[:, -1] = True
strides = torch.arange(0,
batch_size * vocab_size,
vocab_size,
device=logits.device)
flatten_idx = logits_idx[:, cutoff:] + strides.unsqueeze(dim=1)
valid_idx = torch.masked_select(flatten_idx, top_p_mask)
logits_flatten = logits.flatten()
valid_logits = torch.index_select(logits_flatten, 0, valid_idx)
logits = torch.empty_like(logits_flatten).fill_(-float("inf"))
logits[valid_idx] = valid_logits
return logits.reshape(batch_size, vocab_size)
# test with leading dimension and merge seqlen and batch_size as num_tokens
@torch.inference_mode()
def test_apply_top_k_top_p() -> None:
logits = torch.randn((128, 7168)).npu()
k = torch.Tensor([-1]).int().npu()
p = torch.Tensor([1]).int().npu()
logits_new = apply_top_k_top_p_new(logits, k, p)
logits_old = apply_top_k_top_p(logits, k, p)
# Compare the results.
torch.testing.assert_close(logits_new,
logits_old,
atol=DEFAULT_ATOL,
rtol=DEFAULT_RTOL)
with VllmRunner("Qwen/Qwen3-0.6B",
max_model_len=8192,
gpu_memory_utilization=0.7) as runner:
runner.generate_greedy_logprobs(example_prompts,
max_tokens=5,
num_logprobs=1)

View File

@@ -0,0 +1,89 @@
#
# Copyright (c) 2025 Huawei Technologies Co., Ltd. All Rights Reserved.
# Copyright 2023 The vLLM team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# This file is a part of the vllm-ascend project.
# Adapted from vllm/tests/basic_correctness/test_basic_correctness.py
#
"""Compare the short outputs of HF and vLLM when using greedy sampling.
Run `pytest tests/test_offline_inference.py`.
"""
import os
import pytest
from vllm import SamplingParams
from vllm.assets.audio import AudioAsset
from vllm.assets.image import ImageAsset
from tests.e2e.conftest import VllmRunner
os.environ["PYTORCH_NPU_ALLOC_CONF"] = "max_split_size_mb:256"
@pytest.mark.skip(reason="fix me")
def test_multimodal_vl(prompt_template):
image = ImageAsset("cherry_blossom") \
.pil_image.convert("RGB")
img_questions = [
"What is the content of this image?",
"Describe the content of this image in detail.",
"What's in the image?",
"Where is this image taken?",
]
images = [image] * len(img_questions)
prompts = prompt_template(img_questions)
with VllmRunner("Qwen/Qwen2.5-VL-3B-Instruct",
max_model_len=4096,
mm_processor_kwargs={
"min_pixels": 28 * 28,
"max_pixels": 1280 * 28 * 28,
"fps": 1,
},
enforce_eager=True) as vllm_model:
vllm_model.generate_greedy(prompts=prompts,
images=images,
max_tokens=64)
def test_multimodal_audio():
audio_prompt = "".join([
f"Audio {idx+1}: <|audio_bos|><|AUDIO|><|audio_eos|>\n"
for idx in range(2)
])
question = "What sport and what nursery rhyme are referenced?"
prompt = ("<|im_start|>system\nYou are a helpful assistant.<|im_end|>\n"
"<|im_start|>user\n"
f"{audio_prompt}{question}<|im_end|>\n"
"<|im_start|>assistant\n")
mm_data = {
"audio": [
asset.audio_and_sample_rate for asset in
[AudioAsset("mary_had_lamb"),
AudioAsset("winning_call")]
]
}
inputs = {"prompt": prompt, "multi_modal_data": mm_data}
sampling_params = SamplingParams(temperature=0.2,
max_tokens=10,
stop_token_ids=None)
with VllmRunner("Qwen/Qwen2-Audio-7B-Instruct",
max_model_len=4096,
max_num_seqs=5,
dtype="bfloat16",
limit_mm_per_prompt={"audio": 2},
gpu_memory_utilization=0.9) as runner:
runner.generate(inputs, sampling_params=sampling_params)