diff --git a/pyproject.toml b/pyproject.toml index 665950c5..061db046 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -51,34 +51,25 @@ line-length = 120 # Folder to be modified exclude = [ # Batch (1) - "tests/e2e/__init__.py", - "tests/e2e/310p/", - "tests/e2e/conftest.py", - "tests/e2e/doctests/", - "tests/e2e/model_utils.py", - "tests/e2e/models/", - "tests/e2e/multicard/2-cards/", + # "tests/e2e/__init__.py", + # "tests/e2e/310p/", + # "tests/e2e/conftest.py", + # "tests/e2e/doctests/", + # "tests/e2e/model_utils.py", + # "tests/e2e/models/", + # "tests/e2e/multicard/2-cards/", # Batch (2) "tests/e2e/multicard/4-cards/", "tests/e2e/nightly/multi_node/", - - # Batch (3) - "tests/e2e/nightly/single_node/models/", - - # Batch (4) - "tests/e2e/nightly/single_node/ops/", - - # Batch (5) - # "tests/e2e/singlecard/", - - # Batch (6) - "tests/e2e/nightly/single_node/ops/singlecard_ops/triton/", "tests/e2e/singlecard/pooling/", "tests/e2e/singlecard/spec_decode/", "tests/e2e/utils.py", "tests/e2e/vllm_interface/", "tests/e2e/weekly/", + + # Batch (3) + "tests/e2e/nightly/single_node/", "tests/ut/", ] diff --git a/tests/e2e/310p/multicard/test_vl_model_multicard.py b/tests/e2e/310p/multicard/test_vl_model_multicard.py index 0016ea86..308597bd 100644 --- a/tests/e2e/310p/multicard/test_vl_model_multicard.py +++ b/tests/e2e/310p/multicard/test_vl_model_multicard.py @@ -15,28 +15,23 @@ # limitations under the License. # This file is a part of the vllm-ascend project. -import sys import os +import sys # Add 310p directory to sys.path current_dir = os.path.dirname(os.path.abspath(__file__)) parent_dir = os.path.dirname(current_dir) # 310p directory sys.path.insert(0, parent_dir) +# ruff: noqa: E402 from test_utils import run_vl_model_test + def test_qwen3_vl_8b_tp2_fp16(): """Qwen3-VL-8B dual-card FP16 test""" - run_vl_model_test( - model_name="Qwen/Qwen3-VL-8B-Instruct", - tensor_parallel_size=2, - max_tokens=5 - ) + run_vl_model_test(model_name="Qwen/Qwen3-VL-8B-Instruct", tensor_parallel_size=2, max_tokens=5) + def test_qwen3_vl_32b_tp1_fp16(): """Qwen3-VL-32B 4-card FP16 test""" - run_vl_model_test( - model_name="Qwen/Qwen3-VL-32B-Instruct", - tensor_parallel_size=4, - max_tokens=5 - ) \ No newline at end of file + run_vl_model_test(model_name="Qwen/Qwen3-VL-32B-Instruct", tensor_parallel_size=4, max_tokens=5) diff --git a/tests/e2e/310p/singlecard/test_vl_model_singlecard.py b/tests/e2e/310p/singlecard/test_vl_model_singlecard.py index 380116c7..3c66c96e 100644 --- a/tests/e2e/310p/singlecard/test_vl_model_singlecard.py +++ b/tests/e2e/310p/singlecard/test_vl_model_singlecard.py @@ -15,20 +15,18 @@ # limitations under the License. # This file is a part of the vllm-ascend project. -import sys import os +import sys # Add 310p directory to sys.path current_dir = os.path.dirname(os.path.abspath(__file__)) parent_dir = os.path.dirname(current_dir) # 310p directory sys.path.insert(0, parent_dir) +# ruff: noqa: E402 from test_utils import run_vl_model_test + def test_qwen3_vl_8b_tp1_fp16(): """Qwen3-VL-8B single-card FP16 test""" - run_vl_model_test( - model_name="Qwen/Qwen3-VL-8B-Instruct", - tensor_parallel_size=1, - max_tokens=5 - ) \ No newline at end of file + run_vl_model_test(model_name="Qwen/Qwen3-VL-8B-Instruct", tensor_parallel_size=1, max_tokens=5) diff --git a/tests/e2e/310p/test_utils.py b/tests/e2e/310p/test_utils.py index f9c521fd..eb617343 100644 --- a/tests/e2e/310p/test_utils.py +++ b/tests/e2e/310p/test_utils.py @@ -15,10 +15,12 @@ # limitations under the License. # This file is a part of the vllm-ascend project. -from tests.e2e.conftest import VllmRunner -from PIL import Image import os +from PIL import Image + +from tests.e2e.conftest import VllmRunner + def get_test_image(): """Get the image object for testing""" @@ -32,14 +34,12 @@ def get_test_prompts(): return ["<|image_pad|>Describe this image in detail."] -def run_vl_model_test(model_name: str, - tensor_parallel_size: int, - max_tokens: int, - dtype: str = "float16", - enforce_eager: bool = True): +def run_vl_model_test( + model_name: str, tensor_parallel_size: int, max_tokens: int, dtype: str = "float16", enforce_eager: bool = True +): """ Generic visual language model test function - + Args: model_name: Model name, e.g., "Qwen/Qwen3-VL-4B" tensor_parallel_size: Tensor parallel size @@ -52,9 +52,6 @@ def run_vl_model_test(model_name: str, prompts = get_test_prompts() with VllmRunner( - model_name, - tensor_parallel_size=tensor_parallel_size, - enforce_eager=enforce_eager, - dtype=dtype + model_name, tensor_parallel_size=tensor_parallel_size, enforce_eager=enforce_eager, dtype=dtype ) as vllm_model: - vllm_model.generate_greedy(prompts, max_tokens, images=images) \ No newline at end of file + vllm_model.generate_greedy(prompts, max_tokens, images=images) diff --git a/tests/e2e/conftest.py b/tests/e2e/conftest.py index d6ca2558..edef3664 100644 --- a/tests/e2e/conftest.py +++ b/tests/e2e/conftest.py @@ -32,7 +32,7 @@ import threading import time import traceback from pathlib import Path -from typing import Any, Optional, Tuple, TypeVar, Union +from typing import Any, TypeVar import numpy as np import openai @@ -44,23 +44,20 @@ from modelscope import snapshot_download # type: ignore[import-untyped] from PIL import Image from requests.exceptions import RequestException from torch import nn -from transformers import (AutoConfig, AutoModelForCausalLM, AutoTokenizer, - BatchEncoding, BatchFeature) +from transformers import AutoConfig, AutoModelForCausalLM, AutoTokenizer, BatchEncoding, BatchFeature from transformers.models.auto.auto_factory import _BaseAutoModelClass from vllm import LLM, SamplingParams -from vllm.config.model import (ConvertOption, RunnerOption, - _get_and_verify_dtype) +from vllm.config.model import ConvertOption, RunnerOption, _get_and_verify_dtype from vllm.inputs import TextPrompt from vllm.outputs import RequestOutput from vllm.platforms import current_platform from vllm.transformers_utils.utils import maybe_model_redirect from vllm.utils.network_utils import get_open_port -from tests.e2e.model_utils import (TokensTextLogprobs, - TokensTextLogprobsPromptLogprobs) -from tests.e2e.nightly.multi_node.scripts.multi_node_config import ( - DisaggregatedPrefillCfg, NodeInfo) +from tests.e2e.model_utils import TokensTextLogprobs, TokensTextLogprobsPromptLogprobs +from tests.e2e.nightly.multi_node.scripts.multi_node_config import DisaggregatedPrefillCfg, NodeInfo from vllm_ascend.ascend_config import clear_ascend_config + # TODO: remove this part after the patch merged into vllm, if # we not explicitly patch here, some of them might be effectiveless # in pytest scenario @@ -70,41 +67,41 @@ adapt_patch(True) adapt_patch(False) from vllm.distributed.parallel_state import ( # noqa E402 - destroy_distributed_environment, destroy_model_parallel) + destroy_distributed_environment, + destroy_model_parallel, +) _T = TypeVar("_T", nn.Module, torch.Tensor, BatchEncoding, BatchFeature, dict) _M = TypeVar("_M") -_PromptMultiModalInput = Union[list[_M], list[list[_M]]] +_PromptMultiModalInput = list[_M] | list[list[_M]] PromptImageInput = _PromptMultiModalInput[Image.Image] -PromptAudioInput = _PromptMultiModalInput[Tuple[np.ndarray, int]] +PromptAudioInput = _PromptMultiModalInput[tuple[np.ndarray, int]] PromptVideoInput = _PromptMultiModalInput[np.ndarray] logger = logging.getLogger(__name__) _TEST_DIR = os.path.dirname(__file__) _LONG_PROMPTS = [os.path.join(_TEST_DIR, "prompts", "long_prompt.txt")] -DISAGG_EPD_PROXY_SCRIPT = Path( - __file__ -).parent.parent.parent / "examples" / "disaggregated_encoder" / "disagg_epd_proxy.py" +DISAGG_EPD_PROXY_SCRIPT = ( + Path(__file__).parent.parent.parent / "examples" / "disaggregated_encoder" / "disagg_epd_proxy.py" +) def _check_npu_memory_worker(target_free_percentage: float, max_wait_seconds: float): - import torch_npu # type: ignore - # We can try to clean up memory in this subprocess, though it mostly affects this process. # But if there are any lingering contexts in this process (unlikely for a fresh spawn), it helps. gc.collect() torch.npu.empty_cache() - + _, total_npu_memory = torch.npu.mem_get_info() start_time = time.time() while True: free_bytes, _ = torch.npu.mem_get_info() if free_bytes / total_npu_memory >= target_free_percentage: - print(f'check_npu_memory_worker: npu free memory decreased target value.') + print("check_npu_memory_worker: npu free memory decreased target value.") return # Success elapsed = time.time() - start_time @@ -113,7 +110,7 @@ def _check_npu_memory_worker(target_free_percentage: float, max_wait_seconds: fl print( f"Timeout: NPU memory free size did not reach " f"{target_free_percentage} of total npu memory within {max_wait_seconds} seconds.", - file=sys.stderr + file=sys.stderr, ) sys.exit(1) # Failure @@ -135,21 +132,19 @@ def wait_until_npu_memory_free(target_free_percentage: float = 0.5, max_wait_sec target_free_percentage (float): Target free memory percentage of total. max_wait_seconds (float): Maximum wait time in seconds. """ + def decorator(func): @functools.wraps(func) def wrapper(*args, **kwargs): # Clean up non-NPU resources in the main process cleanup_dist_env_and_memory() - + # Use a spawned subprocess to check NPU memory to avoid initializing NPU in the main process ctx = multiprocessing.get_context("spawn") - p = ctx.Process( - target=_check_npu_memory_worker, - args=(target_free_percentage, max_wait_seconds) - ) + p = ctx.Process(target=_check_npu_memory_worker, args=(target_free_percentage, max_wait_seconds)) p.start() p.join() - + if p.exitcode != 0: raise TimeoutError( f"Timeout: NPU memory free size did not reach " @@ -157,7 +152,9 @@ def wait_until_npu_memory_free(target_free_percentage: float = 0.5, max_wait_sec ) return func(*args, **kwargs) + return wrapper + return decorator @@ -168,9 +165,10 @@ def cleanup_dist_env_and_memory(shutdown_ray: bool = False): torch.distributed.destroy_process_group() if shutdown_ray: import ray # Lazy import Ray + ray.shutdown() gc.collect() - + # Only clean NPU cache if NPU is already initialized/available in this process. # This prevents accidental initialization of NPU context in the main process, # which would break subsequent forks. @@ -180,7 +178,6 @@ def cleanup_dist_env_and_memory(shutdown_ray: bool = False): class MooncakeLauncher: - def __init__( self, mooncake_port, @@ -228,14 +225,12 @@ class MooncakeLauncher: class RemoteOpenAIServer: DUMMY_API_KEY = "token-abc123" # vLLM's OpenAI server does not need API key - def _start_server(self, model: str, server_cmd: list[str], - env_dict: Optional[dict[str, str]]) -> None: - """Subclasses override this method to customize server process launch - """ + def _start_server(self, model: str, server_cmd: list[str], env_dict: dict[str, str] | None) -> None: + """Subclasses override this method to customize server process launch""" env = os.environ.copy() # the current process might initialize npu, # to be safe, we should use spawn method - env['VLLM_WORKER_MULTIPROC_METHOD'] = 'spawn' + env["VLLM_WORKER_MULTIPROC_METHOD"] = "spawn" if env_dict is not None: env.update(env_dict) logger.info(f"Starting server with command: {' '.join(server_cmd)}") @@ -247,47 +242,41 @@ class RemoteOpenAIServer: ) def __init__( - self, - model: str, - vllm_serve_args: Union[list[str], str], - *, - server_host: str = '0.0.0.0', - server_port: int = 8080, - env_dict: Optional[dict[str, str]] = None, - seed: Optional[int] = None, - auto_port: bool = True, - nodes_info: Optional[list[NodeInfo]] = None, - disaggregated_prefill: Optional[DisaggregatedPrefillCfg] = None, - proxy_port: Optional[int] = None, - max_wait_seconds: Optional[float] = None, - override_hf_configs: Optional[dict[str, Any]] = None) -> None: + self, + model: str, + vllm_serve_args: list[str] | str, + *, + server_host: str = "0.0.0.0", + server_port: int = 8080, + env_dict: dict[str, str] | None = None, + seed: int | None = None, + auto_port: bool = True, + nodes_info: list[NodeInfo] | None = None, + disaggregated_prefill: DisaggregatedPrefillCfg | None = None, + proxy_port: int | None = None, + max_wait_seconds: float | None = None, + override_hf_configs: dict[str, Any] | None = None, + ) -> None: if isinstance(vllm_serve_args, str): vllm_serve_args = shlex.split(vllm_serve_args) else: vllm_serve_args = ["vllm", "serve", model, *vllm_serve_args] if auto_port: if "-p" in vllm_serve_args or "--port" in vllm_serve_args: - raise ValueError("You have manually specified the port " - "when `auto_port=True`.") + raise ValueError("You have manually specified the port when `auto_port=True`.") # No need for a port if using unix sockets if "--uds" not in vllm_serve_args: # Don't mutate the input args - vllm_serve_args = vllm_serve_args + [ - "--port", str(get_open_port()) - ] + vllm_serve_args = vllm_serve_args + ["--port", str(get_open_port())] if seed is not None: if "--seed" in vllm_serve_args: - raise ValueError("You have manually specified the seed " - f"when `seed={seed}`.") + raise ValueError(f"You have manually specified the seed when `seed={seed}`.") vllm_serve_args = vllm_serve_args + ["--seed", str(seed)] if override_hf_configs is not None: - vllm_serve_args = vllm_serve_args + [ - "--hf-overrides", - json.dumps(override_hf_configs) - ] + vllm_serve_args = vllm_serve_args + ["--hf-overrides", json.dumps(override_hf_configs)] self.host = str(server_host) self.port = int(server_port) @@ -303,9 +292,7 @@ class RemoteOpenAIServer: assert proxy_port is not None, "for disaggregated_prefill, proxy port must be provided" self._wait_for_server_pd(timeout=max_wait_seconds) else: - self._wait_for_multiple_servers( - [(self.host, self.url_for("health"))], - timeout=max_wait_seconds) + self._wait_for_multiple_servers([(self.host, self.url_for("health"))], timeout=max_wait_seconds) def __enter__(self): return self @@ -313,7 +300,7 @@ class RemoteOpenAIServer: def __exit__(self, exc_type, exc_value, traceback): self._terminate_server() - def _poll(self) -> Optional[int]: + def _poll(self) -> int | None: """Subclasses override this method to customize process polling""" return self.proc.poll() @@ -345,24 +332,23 @@ class RemoteOpenAIServer: def url_health(ip: str, port: int) -> str: return f"http://{ip}:{port}/health" - targets = [(node_info.ip, url_health(node_info.ip, self.port)) - for node_info in self.nodes_info if not node_info.headless] + targets = [ + (node_info.ip, url_health(node_info.ip, self.port)) + for node_info in self.nodes_info + if not node_info.headless + ] # Wait for proxy ready master_node = self.nodes_info[0] url_proxy = f"http://{master_node.ip}:{proxy_port}/healthcheck" # Wait for master node proxy first - self._wait_for_multiple_servers([(master_node.ip, url_proxy)], - timeout=timeout) + self._wait_for_multiple_servers([(master_node.ip, url_proxy)], timeout=timeout) # Then wait for all api_server nodes self._wait_for_multiple_servers(targets=targets, timeout=timeout) - def _wait_for_multiple_servers(self, - targets, - timeout: float, - log_interval: float = 30.0): + def _wait_for_multiple_servers(self, targets, timeout: float, log_interval: float = 30.0): """ targets: List[(node_ip, url)] log_interval @@ -396,9 +382,7 @@ class RemoteOpenAIServer: # check unexpected exit result = self._poll() if result is not None and result != 0: - raise RuntimeError( - f"Server at {node_ip} exited unexpectedly." - ) from None + raise RuntimeError(f"Server at {node_ip} exited unexpectedly.") from None if should_log: last_log_time = now @@ -444,35 +428,31 @@ class RemoteOpenAIServer: def get_async_client(self, **kwargs): if "timeout" not in kwargs: kwargs["timeout"] = 600 - return openai.AsyncOpenAI(base_url=self.url_for("v1"), - api_key=self.DUMMY_API_KEY, - max_retries=0, - **kwargs) + return openai.AsyncOpenAI(base_url=self.url_for("v1"), api_key=self.DUMMY_API_KEY, max_retries=0, **kwargs) class RemoteEPDServer(RemoteOpenAIServer): - def _start_server(self, model: str, server_cmd: list[str], - env_dict: Optional[dict[str, str]]) -> None: - """Subclasses override this method to customize server process launch - """ + def _start_server(self, model: str, server_cmd: list[str], env_dict: dict[str, str] | None) -> None: + """Subclasses override this method to customize server process launch""" raise NotImplementedError("RemoteEPDServer should use _start_server_with_prefix instead") - def __init__(self, - vllm_serve_args: Union[list[str], list[list[str]]], - server_host: str = '0.0.0.0', - env_dict: Optional[dict[str, str]] = None, - max_wait_seconds: Optional[float] = 2800) -> None: - + def __init__( + self, + vllm_serve_args: list[str] | list[list[str]], + server_host: str = "0.0.0.0", + env_dict: dict[str, str] | None = None, + max_wait_seconds: float | None = 2800, + ) -> None: self._proc_list = [] self.env_dict: dict[str, str] = {} if env_dict is not None: self.env_dict.update(env_dict) - self.env_dict['VLLM_ALLOW_LONG_MAX_MODEL_LEN'] = "1" - self.env_dict['VLLM_USE_V1'] = "1" - self.env_dict['PYTORCH_NPU_ALLOC_CONF'] = "expandable_segments:True" - self.env_dict['VLLM_WORKER_MULTIPROC_METHOD'] = 'spawn' + self.env_dict["VLLM_ALLOW_LONG_MAX_MODEL_LEN"] = "1" + self.env_dict["VLLM_USE_V1"] = "1" + self.env_dict["PYTORCH_NPU_ALLOC_CONF"] = "expandable_segments:True" + self.env_dict["VLLM_WORKER_MULTIPROC_METHOD"] = "spawn" self.vllm_serve_args_list = [] self.health_url_list = [] @@ -484,8 +464,7 @@ class RemoteEPDServer(RemoteOpenAIServer): self.vllm_serve_args_list.append([str(arg) for arg in args_copy]) else: self.vllm_serve_args_list = [ - [str(arg) for arg in sublist] - for sublist in copy.deepcopy(vllm_serve_args) + [str(arg) for arg in sublist] for sublist in copy.deepcopy(vllm_serve_args) ] else: raise RuntimeError("vllm_serves_args must be a list") @@ -493,7 +472,7 @@ class RemoteEPDServer(RemoteOpenAIServer): serve_arg_cmd = ["vllm", "serve"] for i, vllm_serve_arg in enumerate(self.vllm_serve_args_list): - self.env_dict['ASCEND_RT_VISIBLE_DEVICES'] = str(i) + self.env_dict["ASCEND_RT_VISIBLE_DEVICES"] = str(i) if isinstance(vllm_serve_arg, list): if "--port" not in vllm_serve_arg: raise ValueError("You have manually specified the port ") @@ -514,16 +493,13 @@ class RemoteEPDServer(RemoteOpenAIServer): self.health_url_list.append(super().url_for("health")) vllm_serve_arg = [*serve_arg_cmd, *vllm_serve_arg] - proc = self._start_server_with_prefix(vllm_serve_arg, self.env_dict, - f"[VLLM_{i}] ") + proc = self._start_server_with_prefix(vllm_serve_arg, self.env_dict, f"[VLLM_{i}] ") self._proc_list.append(proc) timeout_value = float(max_wait_seconds) if max_wait_seconds is not None else 2800.0 - super()._wait_for_multiple_servers([(self.host, url) - for url in self.health_url_list], - timeout=timeout_value) + super()._wait_for_multiple_servers([(self.host, url) for url in self.health_url_list], timeout=timeout_value) - def _poll(self) -> Optional[int]: + def _poll(self) -> int | None: return None def _delete_shm(self) -> None: @@ -542,31 +518,23 @@ class RemoteEPDServer(RemoteOpenAIServer): def _read_output(self, pipe, prefix): try: with pipe: - for line in iter(pipe.readline, ''): + for line in iter(pipe.readline, ""): if line: - print(f"{prefix}: {line}", end='') + print(f"{prefix}: {line}", end="") except Exception as e: print(f"error: {e}") traceback.print_exc() - def _start_server_with_prefix(self, server_cmd: list[str], - env_dict: Optional[dict[str, str]], log_prefix: str): + def _start_server_with_prefix(self, server_cmd: list[str], env_dict: dict[str, str] | None, log_prefix: str): env = os.environ.copy() if env_dict is not None: env.update(env_dict) - proc = subprocess.Popen(server_cmd, - env=env, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - universal_newlines=True, - bufsize=1) - stdout_thread = threading.Thread(target=self._read_output, - args=(proc.stdout, log_prefix), - daemon=True) - stderr_thread = threading.Thread(target=self._read_output, - args=(proc.stderr, log_prefix), - daemon=True) + proc = subprocess.Popen( + server_cmd, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, bufsize=1 + ) + stdout_thread = threading.Thread(target=self._read_output, args=(proc.stdout, log_prefix), daemon=True) + stderr_thread = threading.Thread(target=self._read_output, args=(proc.stderr, log_prefix), daemon=True) stdout_thread.start() stderr_thread.start() @@ -579,27 +547,21 @@ class RemoteEPDServer(RemoteOpenAIServer): parent = psutil.Process(proc.pid) children = parent.children(recursive=True) for child in children: - try: + with contextlib.suppress(psutil.NoSuchProcess): child.terminate() - except psutil.NoSuchProcess: - pass gone, still_alive = psutil.wait_procs(children, timeout=10) for child in still_alive: - try: + with contextlib.suppress(psutil.NoSuchProcess): child.kill() - except psutil.NoSuchProcess: - pass try: parent.terminate() parent.wait(timeout=10) except (psutil.NoSuchProcess, psutil.TimeoutExpired): - try: + with contextlib.suppress(psutil.NoSuchProcess): parent.kill() - except psutil.NoSuchProcess: - pass def __enter__(self): """Context manager entry point.""" @@ -611,13 +573,13 @@ class RemoteEPDServer(RemoteOpenAIServer): class DisaggEpdProxy(RemoteEPDServer): - - def __init__(self, - proxy_args: Optional[Union[list[str], str]] = None, - env_dict: Optional[dict[str, str]] = None, - server_host: str = '0.0.0.0', - max_wait_seconds: Optional[float] = 2800) -> None: - + def __init__( + self, + proxy_args: list[str] | str | None = None, + env_dict: dict[str, str] | None = None, + server_host: str = "0.0.0.0", + max_wait_seconds: float | None = 2800, + ) -> None: if proxy_args is None: proxy_args_list: list[str] = [] elif isinstance(proxy_args, str): @@ -648,8 +610,7 @@ class DisaggEpdProxy(RemoteEPDServer): self.port = int(port_str) timeout_value = float(max_wait_seconds) if max_wait_seconds is not None else 2800.0 - super()._wait_for_multiple_servers( - [(self.host, super().url_for("health"))], timeout=timeout_value) + super()._wait_for_multiple_servers([(self.host, super().url_for("health"))], timeout=timeout_value) def __enter__(self): """Context manager entry point.""" @@ -661,23 +622,22 @@ class DisaggEpdProxy(RemoteEPDServer): class VllmRunner: - def __init__( self, model_name: str, runner: RunnerOption = "auto", convert: ConvertOption = "auto", - tokenizer_name: Optional[str] = None, + tokenizer_name: str | None = None, tokenizer_mode: str = "auto", - max_model_len: Optional[int] = 1024, + max_model_len: int | None = 1024, dtype: str = "auto", disable_log_stats: bool = True, tensor_parallel_size: int = 1, block_size: int = 16, enable_chunked_prefill: bool = True, swap_space: int = 4, - enforce_eager: Optional[bool] = False, - quantization: Optional[str] = None, + enforce_eager: bool | None = False, + quantization: str | None = None, **kwargs, ) -> None: self.model = LLM( @@ -701,17 +661,13 @@ class VllmRunner: def get_inputs( self, - prompts: Union[list[str], list[torch.Tensor], list[int]], - images: Optional[PromptImageInput] = None, - videos: Optional[PromptVideoInput] = None, - audios: Optional[PromptAudioInput] = None, + prompts: list[str] | list[torch.Tensor] | list[int], + images: PromptImageInput | None = None, + videos: PromptVideoInput | None = None, + audios: PromptAudioInput | None = None, ) -> list[TextPrompt]: - - if any(x is not None and len(x) != len(prompts) - for x in [images, videos, audios]): - raise ValueError( - "All non-None multimodal inputs must have the same length as " - "prompts") + if any(x is not None and len(x) != len(prompts) for x in [images, videos, audios]): + raise ValueError("All non-None multimodal inputs must have the same length as prompts") inputs = [] for i, prompt in enumerate(prompts): @@ -719,13 +675,11 @@ class VllmRunner: if images is not None and (image := images[i]) is not None: multi_modal_data["image"] = image if videos is not None and (video := videos[i]) is not None: - multi_modal_data["video"] = video # type: ignore + multi_modal_data["video"] = video # type: ignore if audios is not None and (audio := audios[i]) is not None: - multi_modal_data["audio"] = audio # type: ignore + multi_modal_data["audio"] = audio # type: ignore - text_prompt_kwargs: dict[str, Any] = { - "multi_modal_data": multi_modal_data or None - } + text_prompt_kwargs: dict[str, Any] = {"multi_modal_data": multi_modal_data or None} if isinstance(prompt, str): text_prompt_kwargs["prompt"] = prompt elif isinstance(prompt, list): @@ -739,21 +693,16 @@ class VllmRunner: def generate( self, - prompts: Union[list[str], list[torch.Tensor]], + prompts: list[str] | list[torch.Tensor], sampling_params: SamplingParams, - images: Optional[PromptImageInput] = None, - videos: Optional[PromptVideoInput] = None, - audios: Optional[PromptAudioInput] = None, + images: PromptImageInput | None = None, + videos: PromptVideoInput | None = None, + audios: PromptAudioInput | None = None, **kwargs: Any, ) -> list[tuple[list[list[int]], list[str]]]: - inputs = self.get_inputs(prompts, - images=images, - videos=videos, - audios=audios) + inputs = self.get_inputs(prompts, images=images, videos=videos, audios=audios) - req_outputs = self.model.generate(inputs, - sampling_params=sampling_params, - **kwargs) + req_outputs = self.model.generate(inputs, sampling_params=sampling_params, **kwargs) outputs: list[tuple[list[list[int]], list[str]]] = [] for req_output in req_outputs: @@ -780,99 +729,83 @@ class VllmRunner: output_str = sample.text output_ids = list(sample.token_ids) output_logprobs = sample.logprobs - outputs.append((output_ids, output_str, output_logprobs, - req_output.prompt_logprobs)) + outputs.append((output_ids, output_str, output_logprobs, req_output.prompt_logprobs)) return outputs def generate_w_logprobs( self, prompts: list[str], sampling_params: SamplingParams, - images: Optional[PromptImageInput] = None, - audios: Optional[PromptAudioInput] = None, - videos: Optional[PromptVideoInput] = None, + images: PromptImageInput | None = None, + audios: PromptAudioInput | None = None, + videos: PromptVideoInput | None = None, **kwargs: Any, - ) -> Union[list[TokensTextLogprobs], - list[TokensTextLogprobsPromptLogprobs]]: - inputs = self.get_inputs(prompts, - images=images, - videos=videos, - audios=audios) + ) -> list[TokensTextLogprobs] | list[TokensTextLogprobsPromptLogprobs]: + inputs = self.get_inputs(prompts, images=images, videos=videos, audios=audios) - req_outputs = self.model.generate(inputs, - sampling_params=sampling_params, - **kwargs) + req_outputs = self.model.generate(inputs, sampling_params=sampling_params, **kwargs) - toks_str_logsprobs_prompt_logprobs = ( - self._final_steps_generate_w_logprobs(req_outputs)) + toks_str_logsprobs_prompt_logprobs = self._final_steps_generate_w_logprobs(req_outputs) # Omit prompt logprobs if not required by sampling params - return ([x[0:-1] for x in toks_str_logsprobs_prompt_logprobs] - if sampling_params.prompt_logprobs is None else - toks_str_logsprobs_prompt_logprobs) + return ( + [x[0:-1] for x in toks_str_logsprobs_prompt_logprobs] + if sampling_params.prompt_logprobs is None + else toks_str_logsprobs_prompt_logprobs + ) def generate_greedy( self, - prompts: Union[list[str], list[torch.Tensor]], + prompts: list[str] | list[torch.Tensor], max_tokens: int, - images: Optional[PromptImageInput] = None, - videos: Optional[PromptVideoInput] = None, - audios: Optional[PromptAudioInput] = None, + images: PromptImageInput | None = None, + videos: PromptVideoInput | None = None, + audios: PromptAudioInput | None = None, **kwargs: Any, ) -> list[tuple[list[int], str]]: greedy_params = SamplingParams(temperature=0.0, max_tokens=max_tokens) - outputs = self.generate(prompts, - greedy_params, - images=images, - videos=videos, - audios=audios, - **kwargs) - return [(output_ids[0], output_str[0]) - for output_ids, output_str in outputs] + outputs = self.generate(prompts, greedy_params, images=images, videos=videos, audios=audios, **kwargs) + return [(output_ids[0], output_str[0]) for output_ids, output_str in outputs] def generate_greedy_logprobs( self, prompts: list[str], max_tokens: int, - num_logprobs: Optional[int], - num_prompt_logprobs: Optional[int] = None, - images: Optional[PromptImageInput] = None, - audios: Optional[PromptAudioInput] = None, - videos: Optional[PromptVideoInput] = None, - stop_token_ids: Optional[list[int]] = None, - stop: Optional[list[str]] = None, + num_logprobs: int | None, + num_prompt_logprobs: int | None = None, + images: PromptImageInput | None = None, + audios: PromptAudioInput | None = None, + videos: PromptVideoInput | None = None, + stop_token_ids: list[int] | None = None, + stop: list[str] | None = None, **kwargs: Any, - ) -> Union[list[TokensTextLogprobs], - list[TokensTextLogprobsPromptLogprobs]]: + ) -> list[TokensTextLogprobs] | list[TokensTextLogprobsPromptLogprobs]: greedy_logprobs_params = SamplingParams( temperature=0.0, max_tokens=max_tokens, logprobs=num_logprobs, prompt_logprobs=num_prompt_logprobs, stop_token_ids=stop_token_ids, - stop=stop) + stop=stop, + ) - return self.generate_w_logprobs(prompts, - greedy_logprobs_params, - images=images, - audios=audios, - videos=videos, - **kwargs) + return self.generate_w_logprobs( + prompts, greedy_logprobs_params, images=images, audios=audios, videos=videos, **kwargs + ) def classify(self, prompts: list[str]) -> list[list[float]]: req_outputs = self.model.classify(prompts) return [req_output.outputs.probs for req_output in req_outputs] - def embed(self, - prompts: list[str], - images: Optional[PromptImageInput] = None, - videos: Optional[PromptVideoInput] = None, - audios: Optional[PromptAudioInput] = None, - *args, - **kwargs) -> list[list[float]]: - inputs = self.get_inputs(prompts, - images=images, - videos=videos, - audios=audios) + def embed( + self, + prompts: list[str], + images: PromptImageInput | None = None, + videos: PromptVideoInput | None = None, + audios: PromptAudioInput | None = None, + *args, + **kwargs, + ) -> list[list[float]]: + inputs = self.get_inputs(prompts, images=images, videos=videos, audios=audios) req_outputs = self.model.embed(inputs, *args, **kwargs) return [req_output.outputs.embedding for req_output in req_outputs] @@ -887,8 +820,8 @@ class VllmRunner: def score( self, - text_1: Union[str, list[str]], - text_2: Union[str, list[str]], + text_1: str | list[str], + text_2: str | list[str], *args, **kwargs, ) -> list[float]: @@ -905,14 +838,11 @@ class VllmRunner: class HfRunner: - def get_default_device(self): + return "cpu" if current_platform.is_cpu() else current_platform.device_type - return ("cpu" - if current_platform.is_cpu() else current_platform.device_type) - - def wrap_device(self, x: _T, device: Optional[str] = None) -> _T: - if x is None or isinstance(x, (bool, )): + def wrap_device(self, x: _T, device: str | None = None) -> _T: + if x is None or isinstance(x, (bool,)): return x if device is None: @@ -931,7 +861,7 @@ class HfRunner: model_name: str, dtype: str = "auto", *, - model_kwargs: Optional[dict[str, Any]] = None, + model_kwargs: dict[str, Any] | None = None, trust_remote_code: bool = True, is_sentence_transformer: bool = False, is_cross_encoder: bool = False, @@ -984,14 +914,15 @@ class HfRunner: ) # in case some unquantized custom models are not in same dtype - if (getattr(model, "quantization_method", None) is None - and any(p.dtype != self.dtype - for p in model.parameters())): + if getattr(model, "quantization_method", None) is None and any( + p.dtype != self.dtype for p in model.parameters() + ): model = model.to(dtype=self.dtype) - if (getattr(model, "quantization_method", None) != "bitsandbytes" - and len({p.device - for p in model.parameters()}) < 2): + if ( + getattr(model, "quantization_method", None) != "bitsandbytes" + and len({p.device for p in model.parameters()}) < 2 + ): model = model.to(device=self.device) self.model = model @@ -1006,6 +937,7 @@ class HfRunner: # don't put this import at the top level # it will call torch.cuda.device_count() from transformers import AutoProcessor # noqa: F401 + self.processor = AutoProcessor.from_pretrained( model_name, torch_dtype=torch_dtype, @@ -1017,10 +949,10 @@ class HfRunner: def get_inputs( self, prompts: list[str], - images: Optional[PromptImageInput] = None, - videos: Optional[PromptVideoInput] = None, - audios: Optional[PromptAudioInput] = None, - ) -> list[Union[BatchFeature, BatchEncoding]]: + images: PromptImageInput | None = None, + videos: PromptVideoInput | None = None, + audios: PromptAudioInput | None = None, + ) -> list[BatchFeature | BatchEncoding]: if images is not None: assert len(prompts) == len(images) @@ -1030,7 +962,7 @@ class HfRunner: if audios is not None: assert len(prompts) == len(audios) - all_inputs: list[Union[BatchFeature, BatchEncoding]] = [] + all_inputs: list[BatchFeature | BatchEncoding] = [] for i, prompt in enumerate(prompts): processor_kwargs: dict[str, Any] = { "text": prompt, @@ -1076,16 +1008,11 @@ class HfRunner: return outputs - def encode(self, prompts: list[str], *args, - **kwargs) -> list[list[torch.Tensor]]: + def encode(self, prompts: list[str], *args, **kwargs) -> list[list[torch.Tensor]]: return self.model.encode(prompts, *args, **kwargs) - def predict(self, prompts: list[list[str]], *args, - **kwargs) -> torch.Tensor: - return self.model.predict(prompts, - *args, - convert_to_tensor=True, - **kwargs) + def predict(self, prompts: list[list[str]], *args, **kwargs) -> torch.Tensor: + return self.model.predict(prompts, *args, convert_to_tensor=True, **kwargs) def __enter__(self): return self @@ -1103,22 +1030,25 @@ def ilama_lora_files(): @pytest.fixture(scope="session") def llama32_lora_files(): from huggingface_hub import snapshot_download as hf_snapshot_download + return hf_snapshot_download(repo_id="jeeejeee/llama32-3b-text2sql-spider", local_files_only=True) def qwen_prompt(questions: list[str]) -> list[str]: placeholder = "<|image_pad|>" - return [("<|im_start|>system\nYou are a helpful assistant.<|im_end|>\n" - f"<|im_start|>user\n<|vision_start|>{placeholder}<|vision_end|>" - f"{q}<|im_end|>\n<|im_start|>assistant\n") for q in questions] + return [ + ( + "<|im_start|>system\nYou are a helpful assistant.<|im_end|>\n" + f"<|im_start|>user\n<|vision_start|>{placeholder}<|vision_end|>" + f"{q}<|im_end|>\n<|im_start|>assistant\n" + ) + for q in questions + ] def hunyuan_prompt(questions: list[str]) -> list[str]: placeholder = "<|hy_place▁holder▁no▁100|><|hy_place▁holder▁no▁102|><|hy_place▁holder▁no▁101|>" # noqa: E501 - return [ - f"<|hy_begin▁of▁sentence|>{placeholder}{question}<|hy_User|>" - for question in questions - ] + return [f"<|hy_begin▁of▁sentence|>{placeholder}{question}<|hy_User|>" for question in questions] PROMPT_CONFIGS = { diff --git a/tests/e2e/model_utils.py b/tests/e2e/model_utils.py index 3c3f6220..deb58863 100644 --- a/tests/e2e/model_utils.py +++ b/tests/e2e/model_utils.py @@ -17,11 +17,11 @@ # Adapted from vllm-project/vllm/blob/main/tests/models/utils.py # -from typing import Dict, List, Optional, Sequence, Tuple, Union +from collections.abc import Sequence from vllm.logprobs import PromptLogprobs, SampleLogprobs -TokensText = Tuple[List[int], str] +TokensText = tuple[list[int], str] def check_outputs_equal( @@ -37,18 +37,18 @@ def check_outputs_equal( """ assert len(outputs_0_lst) == len(outputs_1_lst) - for prompt_idx, (outputs_0, - outputs_1) in enumerate(zip(outputs_0_lst, - outputs_1_lst)): + for prompt_idx, (outputs_0, outputs_1) in enumerate(zip(outputs_0_lst, outputs_1_lst)): output_ids_0, output_str_0 = outputs_0 output_ids_1, output_str_1 = outputs_1 # The text and token outputs should exactly match - fail_msg = (f"Test{prompt_idx}:" - f"\n{name_0}:\t{output_str_0!r}" - f"\n{name_1}:\t{output_str_1!r}" - f"\n{name_0}:\t{output_ids_0!r}" - f"\n{name_1}:\t{output_ids_1!r}") + fail_msg = ( + f"Test{prompt_idx}:" + f"\n{name_0}:\t{output_str_0!r}" + f"\n{name_1}:\t{output_str_1!r}" + f"\n{name_0}:\t{output_ids_0!r}" + f"\n{name_1}:\t{output_ids_1!r}" + ) assert output_str_0 == output_str_1, fail_msg assert output_ids_0 == output_ids_1, fail_msg @@ -60,9 +60,7 @@ def check_outputs_equal( # * List of top sample logprobs for each sampled token # # Assumes prompt logprobs were not requested. -TokensTextLogprobs = Tuple[List[int], str, Optional[Union[List[Dict[int, - float]], - SampleLogprobs]]] +TokensTextLogprobs = tuple[list[int], str, list[dict[int, float]] | SampleLogprobs | None] # Representation of generated sequence as a tuple of # * Token ID list @@ -71,6 +69,9 @@ TokensTextLogprobs = Tuple[List[int], str, Optional[Union[List[Dict[int, # * Optional list of top prompt logprobs for each prompt token # # Allows prompt logprobs to be requested. -TokensTextLogprobsPromptLogprobs = Tuple[ - List[int], str, Optional[Union[List[Dict[int, float]], SampleLogprobs]], - Optional[Union[List[Optional[Dict[int, float]]], PromptLogprobs]]] +TokensTextLogprobsPromptLogprobs = tuple[ + list[int], + str, + list[dict[int, float]] | SampleLogprobs | None, + list[dict[int, float] | None] | PromptLogprobs | None, +] diff --git a/tests/e2e/models/conftest.py b/tests/e2e/models/conftest.py index 9370ff8c..0d339922 100644 --- a/tests/e2e/models/conftest.py +++ b/tests/e2e/models/conftest.py @@ -55,16 +55,12 @@ def report_dir(pytestconfig): def pytest_generate_tests(metafunc): if "config_filename" in metafunc.fixturenames: - if metafunc.config.getoption("--config-list-file"): rel_path = metafunc.config.getoption("--config-list-file") config_list_file = Path(rel_path).resolve() config_dir = config_list_file.parent with open(config_list_file, encoding="utf-8") as f: - configs = [ - config_dir / line.strip() for line in f - if line.strip() and not line.startswith("#") - ] + configs = [config_dir / line.strip() for line in f if line.strip() and not line.startswith("#")] metafunc.parametrize("config_filename", configs) else: single_config = metafunc.config.getoption("--config") diff --git a/tests/e2e/models/test_lm_eval_correctness.py b/tests/e2e/models/test_lm_eval_correctness.py index 3d0ce6be..8c7e093b 100644 --- a/tests/e2e/models/test_lm_eval_correctness.py +++ b/tests/e2e/models/test_lm_eval_correctness.py @@ -24,16 +24,15 @@ class EnvConfig: @pytest.fixture def env_config() -> EnvConfig: - return EnvConfig(vllm_version=os.getenv('VLLM_VERSION', 'unknown'), - vllm_commit=os.getenv('VLLM_COMMIT', 'unknown'), - vllm_ascend_version=os.getenv('VLLM_ASCEND_VERSION', - 'unknown'), - vllm_ascend_commit=os.getenv('VLLM_ASCEND_COMMIT', - 'unknown'), - cann_version=os.getenv('CANN_VERSION', 'unknown'), - torch_version=os.getenv('TORCH_VERSION', 'unknown'), - torch_npu_version=os.getenv('TORCH_NPU_VERSION', - 'unknown')) + return EnvConfig( + vllm_version=os.getenv("VLLM_VERSION", "unknown"), + vllm_commit=os.getenv("VLLM_COMMIT", "unknown"), + vllm_ascend_version=os.getenv("VLLM_ASCEND_VERSION", "unknown"), + vllm_ascend_commit=os.getenv("VLLM_ASCEND_COMMIT", "unknown"), + cann_version=os.getenv("CANN_VERSION", "unknown"), + torch_version=os.getenv("TORCH_VERSION", "unknown"), + torch_npu_version=os.getenv("TORCH_NPU_VERSION", "unknown"), + ) def build_model_args(eval_config, tp_size): @@ -48,9 +47,13 @@ def build_model_args(eval_config, tp_size): "max_model_len": max_model_len, } for s in [ - "max_images", "gpu_memory_utilization", "enable_expert_parallel", - "tensor_parallel_size", "enforce_eager", "enable_thinking", - "quantization" + "max_images", + "gpu_memory_utilization", + "enable_expert_parallel", + "tensor_parallel_size", + "enforce_eager", + "enable_thinking", + "quantization", ]: val = eval_config.get(s, None) if val is not None: @@ -68,7 +71,7 @@ def generate_report(tp_size, eval_config, report_data, report_dir, env_config): model_args = build_model_args(eval_config, tp_size) parallel_mode = f"TP{model_args.get('tensor_parallel_size', 1)}" - if model_args.get('enable_expert_parallel', False): + if model_args.get("enable_expert_parallel", False): parallel_mode += " + EP" execution_model = f"{'Eager' if model_args.get('enforce_eager', False) else 'ACLGraph'}" @@ -93,17 +96,16 @@ def generate_report(tp_size, eval_config, report_data, report_dir, env_config): num_fewshot=eval_config.get("num_fewshot", "N/A"), rows=report_data["rows"], parallel_mode=parallel_mode, - execution_model=execution_model) + execution_model=execution_model, + ) - report_output = os.path.join( - report_dir, f"{os.path.basename(eval_config['model_name'])}.md") + report_output = os.path.join(report_dir, f"{os.path.basename(eval_config['model_name'])}.md") os.makedirs(os.path.dirname(report_output), exist_ok=True) - with open(report_output, 'w', encoding='utf-8') as f: + with open(report_output, "w", encoding="utf-8") as f: f.write(report_content) -def test_lm_eval_correctness_param(config_filename, tp_size, report_dir, - env_config): +def test_lm_eval_correctness_param(config_filename, tp_size, report_dir, env_config): eval_config = yaml.safe_load(config_filename.read_text(encoding="utf-8")) model_args = build_model_args(eval_config, tp_size) success = True @@ -135,25 +137,26 @@ def test_lm_eval_correctness_param(config_filename, tp_size, report_dir, metric_name = metric["name"] ground_truth = metric["value"] measured_value = round(task_result[metric_name], 4) - task_success = bool( - np.isclose(ground_truth, measured_value, rtol=RTOL)) + task_success = bool(np.isclose(ground_truth, measured_value, rtol=RTOL)) success = success and task_success - print(f"{task_name} | {metric_name}: " - f"ground_truth={ground_truth} | measured={measured_value} | " - f"success={'✅' if task_success else '❌'}") + print( + f"{task_name} | {metric_name}: " + f"ground_truth={ground_truth} | measured={measured_value} | " + f"success={'✅' if task_success else '❌'}" + ) - report_data["rows"].append({ - "task": - task_name, - "metric": - metric_name, - "value": - f"✅{measured_value}" if success else f"❌{measured_value}", - "stderr": - task_result[ - metric_name.replace(',', '_stderr,') if metric_name == - "acc,none" else metric_name.replace(',', '_stderr,')] - }) + report_data["rows"].append( + { + "task": task_name, + "metric": metric_name, + "value": f"✅{measured_value}" if success else f"❌{measured_value}", + "stderr": task_result[ + metric_name.replace(",", "_stderr,") + if metric_name == "acc,none" + else metric_name.replace(",", "_stderr,") + ], + } + ) generate_report(tp_size, eval_config, report_data, report_dir, env_config) assert success diff --git a/tests/e2e/multicard/2-cards/spec_decode/test_spec_decode.py b/tests/e2e/multicard/2-cards/spec_decode/test_spec_decode.py index 89760a4e..cc1427aa 100644 --- a/tests/e2e/multicard/2-cards/spec_decode/test_spec_decode.py +++ b/tests/e2e/multicard/2-cards/spec_decode/test_spec_decode.py @@ -18,15 +18,12 @@ from __future__ import annotations -import math import os -import random -from typing import Any, Union from unittest.mock import patch import pytest from transformers import AutoTokenizer -from vllm import LLM, SamplingParams +from vllm import SamplingParams from vllm.config import CompilationConfig from vllm.v1.metrics.reader import Counter, Vector @@ -101,7 +98,8 @@ def test_eagle3_sp_acceptance( [prompt], tokenize=False, add_generation_prompt=True, - ) for prompt in prompts + ) + for prompt in prompts ] speculative_config = { @@ -112,21 +110,20 @@ def test_eagle3_sp_acceptance( "model": spec_model_name, } - compilation_config = CompilationConfig(cudagraph_mode="FULL_DECODE_ONLY", - cudagraph_capture_sizes=[12]) + compilation_config = CompilationConfig(cudagraph_mode="FULL_DECODE_ONLY", cudagraph_capture_sizes=[12]) with VllmRunner( - main_model_name, - enforce_eager=True, - max_model_len=8192, - disable_log_stats=False, - tensor_parallel_size=2, - max_num_seqs=256, - distributed_executor_backend="mp", - gpu_memory_utilization=0.7, - speculative_config=speculative_config, - compilation_config=compilation_config, - async_scheduling=async_scheduling, + main_model_name, + enforce_eager=True, + max_model_len=8192, + disable_log_stats=False, + tensor_parallel_size=2, + max_num_seqs=256, + distributed_executor_backend="mp", + gpu_memory_utilization=0.7, + speculative_config=speculative_config, + compilation_config=compilation_config, + async_scheduling=async_scheduling, ) as llm: _ = llm.generate(prompts, sampling_params) metrics = llm.model.get_metrics() @@ -142,10 +139,7 @@ def test_eagle3_sp_acceptance( for pos in range(len(metric.values)): num_accepted_tokens_per_pos[pos] += metric.values[pos] - acceptance_per_pos = [ - num_accepted_tokens / num_drafts - for num_accepted_tokens in num_accepted_tokens_per_pos - ] + acceptance_per_pos = [num_accepted_tokens / num_drafts for num_accepted_tokens in num_accepted_tokens_per_pos] golden = BASELINES_SP[method] match = all(abs(a - b) < 0.06 for a, b in zip(acceptance_per_pos, golden)) diff --git a/tests/e2e/multicard/2-cards/test_aclgraph_capture_replay.py b/tests/e2e/multicard/2-cards/test_aclgraph_capture_replay.py index 38a78b65..6eb9db91 100644 --- a/tests/e2e/multicard/2-cards/test_aclgraph_capture_replay.py +++ b/tests/e2e/multicard/2-cards/test_aclgraph_capture_replay.py @@ -25,8 +25,8 @@ import pytest import torch from vllm.utils.network_utils import get_open_port -from vllm_ascend.utils import AscendDeviceType, get_ascend_device_type from tests.e2e.conftest import wait_until_npu_memory_free +from vllm_ascend.utils import AscendDeviceType, get_ascend_device_type MODELS = [ # Offline data parallel mode will be not supported/useful for dense models @@ -58,8 +58,7 @@ def _install_spies(counters: dict[str, Any]) -> contextlib.ExitStack: ] for cls, method, counter in hooks: - stack.enter_context( - patch.object(cls, method, make_spy(cls, method, counter))) + stack.enter_context(patch.object(cls, method, make_spy(cls, method, counter))) return stack @@ -75,18 +74,19 @@ def _run_worker_process( max_tokens: int, ): """Main entry point for the worker process.""" - os.environ.update({ - "VLLM_DP_RANK": str(rank), - "VLLM_DP_RANK_LOCAL": str(local_rank), - "VLLM_DP_SIZE": str(world_size), - "VLLM_DP_MASTER_IP": master_ip, - "VLLM_DP_MASTER_PORT": str(master_port), - }) + os.environ.update( + { + "VLLM_DP_RANK": str(rank), + "VLLM_DP_RANK_LOCAL": str(local_rank), + "VLLM_DP_SIZE": str(world_size), + "VLLM_DP_MASTER_IP": master_ip, + "VLLM_DP_MASTER_PORT": str(master_port), + } + ) # Import vLLM only after environment setup from vllm import LLM, SamplingParams - from vllm.distributed.parallel_state import ( - destroy_distributed_environment, destroy_model_parallel) + from vllm.distributed.parallel_state import destroy_distributed_environment, destroy_model_parallel # Apply hooks and run inference with _install_spies(counters): @@ -100,23 +100,20 @@ def _run_worker_process( # Simple data sharding chunk_size = len(prompts) // world_size start_idx = rank * chunk_size - end_idx = start_idx + chunk_size if rank < world_size - 1 else len( - prompts) + end_idx = start_idx + chunk_size if rank < world_size - 1 else len(prompts) local_prompts = prompts[start_idx:end_idx] llm = LLM( model=model_path, quantization="ascend" if "W8A8" in model_path else None, - enable_expert_parallel=True if "DeepSeek" in model_path else False, + enable_expert_parallel="DeepSeek" in model_path, trust_remote_code=True, ) # Expose model config to the main test process - counters["hidden_layers"].value = ( - llm.llm_engine.model_config.hf_text_config.num_hidden_layers) + counters["hidden_layers"].value = llm.llm_engine.model_config.hf_text_config.num_hidden_layers - llm.generate(local_prompts, - SamplingParams(max_tokens=max_tokens, temperature=0.0)) + llm.generate(local_prompts, SamplingParams(max_tokens=max_tokens, temperature=0.0)) # Explicit cleanup is mandatory in multi-process vLLM tests del llm @@ -162,8 +159,7 @@ def test_models_aclgraph_capture_replay_metrics_dp2( for rank in range(dp_size): p = multiprocessing.Process( target=_run_worker_process, - args=(rank, rank, dp_size, "127.0.0.1", port, counters, model, - max_tokens), + args=(rank, rank, dp_size, "127.0.0.1", port, counters, model, max_tokens), ) p.start() workers.append(p) @@ -175,8 +171,7 @@ def test_models_aclgraph_capture_replay_metrics_dp2( for k in workers: if k.is_alive(): k.kill() - raise RuntimeError( - f"Worker {p.pid} failed with exit code {p.exitcode}") + raise RuntimeError(f"Worker {p.pid} failed with exit code {p.exitcode}") actual_capture = counters["capture"].value actual_replay = counters["replay"].value @@ -185,18 +180,16 @@ def test_models_aclgraph_capture_replay_metrics_dp2( num_layers = counters["hidden_layers"].value num_acl_graphs = num_layers + 1 - num_comm_groups = sum(1 for s in [dp_size, 1] - if s > 1) # dp_size=2, tp_size=1 + num_comm_groups = sum(1 for s in [dp_size, 1] if s > 1) # dp_size=2, tp_size=1 # Metric 1: Graph Capture (ACL Graph Construction) # Ref: vllm_ascend.utils.update_aclgraph_sizes - max_batch_sizes = math.floor((1800 - num_comm_groups * 40) / - num_acl_graphs / (1 + num_comm_groups * 2)) + max_batch_sizes = math.floor((1800 - num_comm_groups * 40) / num_acl_graphs / (1 + num_comm_groups * 2)) expected_capture = max_batch_sizes * num_acl_graphs * dp_size - assert ( - actual_capture == expected_capture - ), f"Capture count mismatch. Expected: {expected_capture}, Got: {actual_capture}" + assert actual_capture == expected_capture, ( + f"Capture count mismatch. Expected: {expected_capture}, Got: {actual_capture}" + ) # Metric 2: Model Execution (NPUModelRunner.execute_model) # vLLM Step Breakdown: @@ -207,9 +200,9 @@ def test_models_aclgraph_capture_replay_metrics_dp2( # vllm default enables Async scheduler, this will take 1 more steps expected_exec_model = (total_steps + 1 + 1) * dp_size - assert ( - num_execute_model == expected_exec_model - ), f"Model execution count mismatch. Expected: {expected_exec_model}, Got: {num_execute_model}" + assert num_execute_model == expected_exec_model, ( + f"Model execution count mismatch. Expected: {expected_exec_model}, Got: {num_execute_model}" + ) # Metric 3: Dummy Runs (Warmup & Alignment) # vLLM synchronizes globally every 32 steps. @@ -228,14 +221,12 @@ def test_models_aclgraph_capture_replay_metrics_dp2( expected_dummy_run = (warmup_runs + padding_runs) * dp_size - assert ( - num_dummy_run == expected_dummy_run - ), f"Dummy run count mismatch. Expected: {expected_dummy_run}, Got: {num_dummy_run}" + assert num_dummy_run == expected_dummy_run, ( + f"Dummy run count mismatch. Expected: {expected_dummy_run}, Got: {num_dummy_run}" + ) # Metric 4: Graph Replay (Inference Execution) # Replays happen for every aligned step across all graphs. expected_replay = num_acl_graphs * aligned_steps * dp_size - assert ( - actual_replay == expected_replay - ), f"Replay count mismatch. Expected: {expected_replay}, Got: {actual_replay}" + assert actual_replay == expected_replay, f"Replay count mismatch. Expected: {expected_replay}, Got: {actual_replay}" diff --git a/tests/e2e/multicard/2-cards/test_data_parallel.py b/tests/e2e/multicard/2-cards/test_data_parallel.py index c197181b..32927755 100644 --- a/tests/e2e/multicard/2-cards/test_data_parallel.py +++ b/tests/e2e/multicard/2-cards/test_data_parallel.py @@ -64,12 +64,8 @@ def test_qwen3_inference_dp2(model, max_tokens): cmd.append("ascend") print(f"Running subprocess: {' '.join(cmd)}") - proc = subprocess.run(cmd, - env=env, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - timeout=600) - output = proc.stdout.decode(errors='ignore') + proc = subprocess.run(cmd, env=env, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, timeout=600) + output = proc.stdout.decode(errors="ignore") print(output) diff --git a/tests/e2e/multicard/2-cards/test_disaggregated_encoder.py b/tests/e2e/multicard/2-cards/test_disaggregated_encoder.py index 6d54a318..ec5ca3a4 100644 --- a/tests/e2e/multicard/2-cards/test_disaggregated_encoder.py +++ b/tests/e2e/multicard/2-cards/test_disaggregated_encoder.py @@ -27,6 +27,7 @@ MODELS = [ SHARED_STORAGE_PATH = "/dev/shm/epd/storage" TENSOR_PARALLELS = [1] + @pytest.mark.asyncio @pytest.mark.parametrize("model", MODELS) @pytest.mark.parametrize("tp_size", TENSOR_PARALLELS) @@ -36,36 +37,61 @@ async def test_models(model: str, tp_size: int) -> None: vllm_server_args = [ [ "--port", - str(encode_port), "--model", model, "--gpu-memory-utilization", - "0.01", "--tensor-parallel-size", - str(tp_size), "--enforce-eager", "--no-enable-prefix-caching", - "--max-model-len", "10000", "--max-num-batched-tokens", "10000", - "--max-num-seqs", "1", "--ec-transfer-config", - '{"ec_connector_extra_config":{"shared_storage_path":"' + - SHARED_STORAGE_PATH + - '"},"ec_connector":"ECExampleConnector","ec_role": "ec_producer"}' + str(encode_port), + "--model", + model, + "--gpu-memory-utilization", + "0.01", + "--tensor-parallel-size", + str(tp_size), + "--enforce-eager", + "--no-enable-prefix-caching", + "--max-model-len", + "10000", + "--max-num-batched-tokens", + "10000", + "--max-num-seqs", + "1", + "--ec-transfer-config", + '{"ec_connector_extra_config":{"shared_storage_path":"' + + SHARED_STORAGE_PATH + + '"},"ec_connector":"ECExampleConnector","ec_role": "ec_producer"}', ], [ "--port", - str(pd_port), "--model", model, "--gpu-memory-utilization", "0.95", + str(pd_port), + "--model", + model, + "--gpu-memory-utilization", + "0.95", "--tensor-parallel-size", - str(tp_size), "--enforce-eager", "--max-model-len", "10000", - "--max-num-batched-tokens", "10000", "--max-num-seqs", "128", + str(tp_size), + "--enforce-eager", + "--max-model-len", + "10000", + "--max-num-batched-tokens", + "10000", + "--max-num-seqs", + "128", "--ec-transfer-config", - '{"ec_connector_extra_config":{"shared_storage_path":"' + - SHARED_STORAGE_PATH + - '"},"ec_connector":"ECExampleConnector","ec_role": "ec_consumer"}' - ] + '{"ec_connector_extra_config":{"shared_storage_path":"' + + SHARED_STORAGE_PATH + + '"},"ec_connector":"ECExampleConnector","ec_role": "ec_consumer"}', + ], ] proxy_port = get_open_port() proxy_args = [ - "--host", "127.0.0.1", "--port", - str(proxy_port), "--encode-servers-urls", - f"http://localhost:{encode_port}", "--decode-servers-urls", - f"http://localhost:{pd_port}", "--prefill-servers-urls", "disable" + "--host", + "127.0.0.1", + "--port", + str(proxy_port), + "--encode-servers-urls", + f"http://localhost:{encode_port}", + "--decode-servers-urls", + f"http://localhost:{pd_port}", + "--prefill-servers-urls", + "disable", ] - with RemoteEPDServer(vllm_serve_args=vllm_server_args) as _: - with DisaggEpdProxy(proxy_args=proxy_args) as proxy: - send_image_request(model, proxy) - + with RemoteEPDServer(vllm_serve_args=vllm_server_args) as _, DisaggEpdProxy(proxy_args=proxy_args) as proxy: + send_image_request(model, proxy) diff --git a/tests/e2e/multicard/2-cards/test_expert_parallel.py b/tests/e2e/multicard/2-cards/test_expert_parallel.py index 86a46d8d..f53de536 100644 --- a/tests/e2e/multicard/2-cards/test_expert_parallel.py +++ b/tests/e2e/multicard/2-cards/test_expert_parallel.py @@ -15,15 +15,12 @@ def test_deepseek_correctness_ep(model_name): max_tokens = 5 # FIXME: Really strange that chunked prefill might lead to different results, investigate further - with VllmRunner(model_name, - cudagraph_capture_sizes=[1, 2, 4, 8], - tensor_parallel_size=2) as vllm_model: + with VllmRunner(model_name, cudagraph_capture_sizes=[1, 2, 4, 8], tensor_parallel_size=2) as vllm_model: tp_output = vllm_model.generate_greedy(example_prompts, max_tokens) - with VllmRunner(model_name, - tensor_parallel_size=2, - cudagraph_capture_sizes=[1, 2, 4, 8], - enable_expert_parallel=True) as vllm_model: + with VllmRunner( + model_name, tensor_parallel_size=2, cudagraph_capture_sizes=[1, 2, 4, 8], enable_expert_parallel=True + ) as vllm_model: ep_output = vllm_model.generate_greedy(example_prompts, max_tokens) check_outputs_equal( diff --git a/tests/e2e/multicard/2-cards/test_external_launcher.py b/tests/e2e/multicard/2-cards/test_external_launcher.py index ff398232..e74b3585 100644 --- a/tests/e2e/multicard/2-cards/test_external_launcher.py +++ b/tests/e2e/multicard/2-cards/test_external_launcher.py @@ -29,6 +29,7 @@ from unittest.mock import patch import pytest import torch_npu from modelscope import snapshot_download # type: ignore + from tests.e2e.conftest import wait_until_npu_memory_free MODELS = ["Qwen/Qwen3-0.6B"] @@ -39,9 +40,7 @@ DEVICE_NAME = torch_npu.npu.get_device_name(0)[:10] @pytest.mark.parametrize("model", MODELS) @patch.dict(os.environ, {"HCCL_BUFFSIZE": "500"}) def test_qwen3_external_launcher(model): - script = Path( - __file__ - ).parent.parent.parent.parent.parent / "examples" / "offline_external_launcher.py" + script = Path(__file__).parent.parent.parent.parent.parent / "examples" / "offline_external_launcher.py" env = os.environ.copy() # TODO: Change to 2 when ci machine has 4 cards cmd = [ @@ -68,7 +67,7 @@ def test_qwen3_external_launcher(model): stderr=subprocess.STDOUT, timeout=600, ) - output = proc.stdout.decode(errors='ignore') + output = proc.stdout.decode(errors="ignore") print(output) @@ -81,16 +80,24 @@ def test_qwen3_external_launcher(model): @pytest.mark.parametrize("model", MOE_MODELS) @wait_until_npu_memory_free() def test_qwen3_moe_external_launcher_ep_tp2(model): - script = Path( - __file__ - ).parent.parent.parent.parent.parent / "examples" / "offline_external_launcher.py" + script = Path(__file__).parent.parent.parent.parent.parent / "examples" / "offline_external_launcher.py" env = os.environ.copy() # TODO: Change to 2 when ci machine has 4 cards cmd = [ sys.executable, - str(script), "--model", model, "--tp-size", "2", "--node-size", "1", - "--node-rank", "0", "--proc-per-node", "2", "--trust-remote-code", - "--enable-expert-parallel" + str(script), + "--model", + model, + "--tp-size", + "2", + "--node-size", + "1", + "--node-rank", + "0", + "--proc-per-node", + "2", + "--trust-remote-code", + "--enable-expert-parallel", ] print(f"Running subprocess: {' '.join(cmd)}") @@ -101,7 +108,7 @@ def test_qwen3_moe_external_launcher_ep_tp2(model): stderr=subprocess.STDOUT, timeout=600, ) - output = proc.stdout.decode(errors='ignore') + output = proc.stdout.decode(errors="ignore") print(output) @@ -113,9 +120,7 @@ def test_qwen3_moe_external_launcher_ep_tp2(model): @patch.dict(os.environ, {"VLLM_ASCEND_ENABLE_NZ": "0"}) @wait_until_npu_memory_free() def test_qwen3_external_launcher_with_sleepmode(): - script = Path( - __file__ - ).parent.parent.parent.parent.parent / "examples" / "offline_external_launcher.py" + script = Path(__file__).parent.parent.parent.parent.parent / "examples" / "offline_external_launcher.py" env = os.environ.copy() # TODO: Change to 2 when ci machine has 4 cards cmd = [ @@ -147,7 +152,7 @@ def test_qwen3_external_launcher_with_sleepmode(): stderr=subprocess.STDOUT, timeout=300, ) - output = proc.stdout.decode(errors='ignore') + output = proc.stdout.decode(errors="ignore") print(output) @@ -158,9 +163,7 @@ def test_qwen3_external_launcher_with_sleepmode(): @patch.dict(os.environ, {"VLLM_ASCEND_ENABLE_NZ": "0"}) def test_qwen3_external_launcher_with_sleepmode_level2(): - script = Path( - __file__ - ).parent.parent.parent.parent.parent / "examples" / "offline_external_launcher.py" + script = Path(__file__).parent.parent.parent.parent.parent / "examples" / "offline_external_launcher.py" env = os.environ.copy() model_path = snapshot_download("Qwen/Qwen3-8B") # TODO: Add moe model test @@ -195,7 +198,7 @@ def test_qwen3_external_launcher_with_sleepmode_level2(): stderr=subprocess.STDOUT, timeout=300, ) - output = proc.stdout.decode(errors='ignore') + output = proc.stdout.decode(errors="ignore") print(output) @@ -210,14 +213,9 @@ def test_qwen3_external_launcher_with_sleepmode_level2(): ) @pytest.mark.parametrize("model", MODELS) @wait_until_npu_memory_free() -@patch.dict(os.environ, { - "VLLM_ASCEND_ENABLE_MATMUL_ALLREDUCE": "1", - "HCCL_BUFFSIZE": "500" -}) +@patch.dict(os.environ, {"VLLM_ASCEND_ENABLE_MATMUL_ALLREDUCE": "1", "HCCL_BUFFSIZE": "500"}) def test_qwen3_external_launcher_with_matmul_allreduce(model): - script = Path( - __file__ - ).parent.parent.parent.parent.parent / "examples" / "offline_external_launcher.py" + script = Path(__file__).parent.parent.parent.parent.parent / "examples" / "offline_external_launcher.py" env = os.environ.copy() cmd = [ sys.executable, @@ -236,7 +234,7 @@ def test_qwen3_external_launcher_with_matmul_allreduce(model): timeout=600, ) - output = proc.stdout.decode(errors='ignore') + output = proc.stdout.decode(errors="ignore") print(output) assert "Generated text:" in output diff --git a/tests/e2e/multicard/2-cards/test_full_graph_mode.py b/tests/e2e/multicard/2-cards/test_full_graph_mode.py index d96834fb..31a05ee3 100644 --- a/tests/e2e/multicard/2-cards/test_full_graph_mode.py +++ b/tests/e2e/multicard/2-cards/test_full_graph_mode.py @@ -26,41 +26,39 @@ from tests.e2e.model_utils import check_outputs_equal def test_qwen3_moe_full_decode_only_tp2(): - if 'HCCL_OP_EXPANSION_MODE' in os.environ: - del os.environ['HCCL_OP_EXPANSION_MODE'] + if "HCCL_OP_EXPANSION_MODE" in os.environ: + del os.environ["HCCL_OP_EXPANSION_MODE"] prompts = [ - "Hello, my name is", "The president of the United States is", - "The capital of France is", "The future of AI is" + "Hello, my name is", + "The president of the United States is", + "The capital of France is", + "The future of AI is", ] model = "Qwen/Qwen3-30B-A3B" sampling_params = SamplingParams(max_tokens=32, temperature=0.0) - with VllmRunner(model, - max_model_len=1024, - tensor_parallel_size=2, - compilation_config={ - "cudagraph_mode": "FULL_DECODE_ONLY", - "cudagraph_capture_sizes": [4, 8, 24, 48, 60] - }) as runner: - vllm_fullgraph_outputs = runner.model.generate(prompts, - sampling_params) + with VllmRunner( + model, + max_model_len=1024, + tensor_parallel_size=2, + compilation_config={"cudagraph_mode": "FULL_DECODE_ONLY", "cudagraph_capture_sizes": [4, 8, 24, 48, 60]}, + ) as runner: + vllm_fullgraph_outputs = runner.model.generate(prompts, sampling_params) with VllmRunner( - model, - max_model_len=1024, - cudagraph_capture_sizes=[4, 8, 24, 48, 60], - tensor_parallel_size=2, + model, + max_model_len=1024, + cudagraph_capture_sizes=[4, 8, 24, 48, 60], + tensor_parallel_size=2, ) as runner: vllm_eager_outputs = runner.model.generate(prompts, sampling_params) vllm_fullgraph_outputs_list = [] for output in vllm_fullgraph_outputs: - vllm_fullgraph_outputs_list.append( - (output.outputs[0].index, output.outputs[0].text)) + vllm_fullgraph_outputs_list.append((output.outputs[0].index, output.outputs[0].text)) vllm_eager_outputs_list = [] for output in vllm_eager_outputs: - vllm_eager_outputs_list.append( - (output.outputs[0].index, output.outputs[0].text)) + vllm_eager_outputs_list.append((output.outputs[0].index, output.outputs[0].text)) check_outputs_equal( outputs_0_lst=vllm_eager_outputs_list, @@ -72,41 +70,39 @@ def test_qwen3_moe_full_decode_only_tp2(): @pytest.mark.skip(reason="CANN8.5 failed with this test, fix me") def test_qwen3_moe_full_graph_tp2(): - if 'HCCL_OP_EXPANSION_MODE' in os.environ: - del os.environ['HCCL_OP_EXPANSION_MODE'] + if "HCCL_OP_EXPANSION_MODE" in os.environ: + del os.environ["HCCL_OP_EXPANSION_MODE"] prompts = [ - "Hello, my name is", "The president of the United States is", - "The capital of France is", "The future of AI is" + "Hello, my name is", + "The president of the United States is", + "The capital of France is", + "The future of AI is", ] model = "Qwen/Qwen3-30B-A3B" sampling_params = SamplingParams(max_tokens=32, temperature=0.0) - with VllmRunner(model, - max_model_len=1024, - tensor_parallel_size=2, - compilation_config={ - "cudagraph_mode": "FULL", - "cudagraph_capture_sizes": [4, 8, 24, 48, 60] - }) as runner: - vllm_fullgraph_outputs = runner.model.generate(prompts, - sampling_params) + with VllmRunner( + model, + max_model_len=1024, + tensor_parallel_size=2, + compilation_config={"cudagraph_mode": "FULL", "cudagraph_capture_sizes": [4, 8, 24, 48, 60]}, + ) as runner: + vllm_fullgraph_outputs = runner.model.generate(prompts, sampling_params) with VllmRunner( - model, - max_model_len=1024, - cudagraph_capture_sizes=[4, 8, 24, 48, 60], - tensor_parallel_size=2, + model, + max_model_len=1024, + cudagraph_capture_sizes=[4, 8, 24, 48, 60], + tensor_parallel_size=2, ) as runner: vllm_eager_outputs = runner.model.generate(prompts, sampling_params) vllm_fullgraph_outputs_list = [] for output in vllm_fullgraph_outputs: - vllm_fullgraph_outputs_list.append( - (output.outputs[0].index, output.outputs[0].text)) + vllm_fullgraph_outputs_list.append((output.outputs[0].index, output.outputs[0].text)) vllm_eager_outputs_list = [] for output in vllm_eager_outputs: - vllm_eager_outputs_list.append( - (output.outputs[0].index, output.outputs[0].text)) + vllm_eager_outputs_list.append((output.outputs[0].index, output.outputs[0].text)) check_outputs_equal( outputs_0_lst=vllm_eager_outputs_list, diff --git a/tests/e2e/multicard/2-cards/test_ilama_lora_tp2.py b/tests/e2e/multicard/2-cards/test_ilama_lora_tp2.py index fc4866ec..e0f70ce6 100644 --- a/tests/e2e/multicard/2-cards/test_ilama_lora_tp2.py +++ b/tests/e2e/multicard/2-cards/test_ilama_lora_tp2.py @@ -1,23 +1,22 @@ import pytest from tests.e2e.conftest import VllmRunner -from tests.e2e.singlecard.test_ilama_lora import (EXPECTED_LORA_OUTPUT, - MODEL_PATH, do_sample) +from tests.e2e.singlecard.test_ilama_lora import EXPECTED_LORA_OUTPUT, MODEL_PATH, do_sample @pytest.mark.parametrize("distributed_executor_backend", ["mp"]) def test_ilama_lora_tp2(distributed_executor_backend, ilama_lora_files): with VllmRunner( - MODEL_PATH, - enable_lora=True, - max_loras=4, - dtype="half", - max_model_len=1024, - max_num_seqs=16, - tensor_parallel_size=2, - cudagraph_capture_sizes=[1, 2, 4, 8], - distributed_executor_backend=distributed_executor_backend, - enforce_eager=True, + MODEL_PATH, + enable_lora=True, + max_loras=4, + dtype="half", + max_model_len=1024, + max_num_seqs=16, + tensor_parallel_size=2, + cudagraph_capture_sizes=[1, 2, 4, 8], + distributed_executor_backend=distributed_executor_backend, + enforce_eager=True, ) as vllm_model: output = do_sample(vllm_model.model, ilama_lora_files, lora_id=2) diff --git a/tests/e2e/multicard/2-cards/test_offline_inference_distributed.py b/tests/e2e/multicard/2-cards/test_offline_inference_distributed.py index 11523180..fae23fa1 100644 --- a/tests/e2e/multicard/2-cards/test_offline_inference_distributed.py +++ b/tests/e2e/multicard/2-cards/test_offline_inference_distributed.py @@ -20,8 +20,10 @@ Run `pytest tests/test_offline_inference.py`. """ + import os from unittest.mock import patch + import pytest from vllm import SamplingParams @@ -51,6 +53,7 @@ GPT_OSS_MODELS = [ "unsloth/gpt-oss-20b-BF16", ] + def test_deepseek_multistream_moe_tp2(): example_prompts = [ "Hello, my name is", @@ -58,15 +61,15 @@ def test_deepseek_multistream_moe_tp2(): dtype = "half" max_tokens = 5 with VllmRunner( - "vllm-ascend/DeepSeek-V3-Pruning", - dtype=dtype, - tensor_parallel_size=2, - cudagraph_capture_sizes=[1, 2, 4, 8], - distributed_executor_backend="mp", - additional_config={ - "enable_multistream_moe": True, - "refresh": True, - }, + "vllm-ascend/DeepSeek-V3-Pruning", + dtype=dtype, + tensor_parallel_size=2, + cudagraph_capture_sizes=[1, 2, 4, 8], + distributed_executor_backend="mp", + additional_config={ + "enable_multistream_moe": True, + "refresh": True, + }, ) as vllm_model: vllm_model.generate_greedy(example_prompts, max_tokens) @@ -78,12 +81,12 @@ def test_qwen3_w4a8_dynamic_tp2(model): ] max_tokens = 5 with VllmRunner( - model, - max_model_len=8192, - dtype="auto", - tensor_parallel_size=2, - cudagraph_capture_sizes=[1, 2, 4, 8], - quantization="ascend", + model, + max_model_len=8192, + dtype="auto", + tensor_parallel_size=2, + cudagraph_capture_sizes=[1, 2, 4, 8], + quantization="ascend", ) as vllm_model: vllm_model.generate_greedy(prompts, max_tokens) @@ -92,20 +95,17 @@ def test_qwen3_moe_sp_tp2() -> None: example_prompts = [ "Hello, my name is", ] - sampling_params = SamplingParams(max_tokens=5, - temperature=0.0, - top_k=50, - top_p=0.9) + sampling_params = SamplingParams(max_tokens=5, temperature=0.0, top_k=50, top_p=0.9) - with VllmRunner("Qwen/Qwen3-30B-A3B", - dtype="auto", - tensor_parallel_size=2, - distributed_executor_backend="mp", - compilation_config={"pass_config": { - "enable_sp": True - }}, - enable_expert_parallel=True, - enforce_eager=True) as vllm_model: + with VllmRunner( + "Qwen/Qwen3-30B-A3B", + dtype="auto", + tensor_parallel_size=2, + distributed_executor_backend="mp", + compilation_config={"pass_config": {"enable_sp": True}}, + enable_expert_parallel=True, + enforce_eager=True, + ) as vllm_model: vllm_model.generate(example_prompts, sampling_params) @@ -113,33 +113,34 @@ def test_qwen3_moe_sp_tp2() -> None: @patch.dict(os.environ, {"HCCL_BUFFSIZE": "2048"}) def test_deepseek_w4a8_accuracy_tp2(model): prompts = [ - "Hello, my name is", "The president of the United States is", - "vLLM is a high-throughput and memory-efficient inference and serving engine for LLMs" - ] - vllm_ds_w4a8_answers = [ - '逍遙而至地去 accrued', '平行于我udo madreHelen', 'ysteepaolis backwards Kj' + "Hello, my name is", + "The president of the United States is", + "vLLM is a high-throughput and memory-efficient inference and serving engine for LLMs", ] + vllm_ds_w4a8_answers = ["逍遙而至地去 accrued", "平行于我udo madreHelen", "ysteepaolis backwards Kj"] sampling_params = SamplingParams(max_tokens=5, temperature=0.0) - with VllmRunner(model, - dtype="auto", - tensor_parallel_size=2, - cudagraph_capture_sizes=[1, 2, 4, 8], - quantization="ascend", - enable_expert_parallel=True) as vllm_model: - vllm_quant_outputs = vllm_model.model.generate(prompts, - sampling_params) + with VllmRunner( + model, + dtype="auto", + tensor_parallel_size=2, + cudagraph_capture_sizes=[1, 2, 4, 8], + quantization="ascend", + enable_expert_parallel=True, + ) as vllm_model: + vllm_quant_outputs = vllm_model.model.generate(prompts, sampling_params) vllm_quant_outputs_list = [] for output in vllm_quant_outputs: - vllm_quant_outputs_list.append( - ([output.outputs[0].index], output.outputs[0].text)) + vllm_quant_outputs_list.append(([output.outputs[0].index], output.outputs[0].text)) vllm_answer_list = [] - vllm_answer_list = ([([0], answer) for answer in vllm_ds_w4a8_answers]) + vllm_answer_list = [([0], answer) for answer in vllm_ds_w4a8_answers] - check_outputs_equal(outputs_0_lst=vllm_answer_list, - outputs_1_lst=vllm_quant_outputs_list, - name_0="vllm_quant_outputs", - name_1="vllm_answer_outputs") + check_outputs_equal( + outputs_0_lst=vllm_answer_list, + outputs_1_lst=vllm_quant_outputs_list, + name_0="vllm_quant_outputs", + name_1="vllm_answer_outputs", + ) @patch.dict(os.environ, {"VLLM_ASCEND_ENABLE_FLASHCOMM1": "1"}) @@ -148,17 +149,16 @@ def test_qwen3_moe_fc2_tp2() -> None: example_prompts = [ "Hello, my name is", ] - sampling_params = SamplingParams(max_tokens=5, - temperature=0.0, - top_k=50, - top_p=0.9) + sampling_params = SamplingParams(max_tokens=5, temperature=0.0, top_k=50, top_p=0.9) - with VllmRunner("Qwen/Qwen3-30B-A3B", - dtype="auto", - tensor_parallel_size=2, - distributed_executor_backend="mp", - enable_expert_parallel=True, - enforce_eager=True) as vllm_model: + with VllmRunner( + "Qwen/Qwen3-30B-A3B", + dtype="auto", + tensor_parallel_size=2, + distributed_executor_backend="mp", + enable_expert_parallel=True, + enforce_eager=True, + ) as vllm_model: vllm_model.generate(example_prompts, sampling_params) @@ -168,20 +168,17 @@ def test_qwen3_moe_fc2_oshard_tp2() -> None: example_prompts = [ "Hello, my name is", ] - sampling_params = SamplingParams(max_tokens=5, - temperature=0.0, - top_k=50, - top_p=0.9) + sampling_params = SamplingParams(max_tokens=5, temperature=0.0, top_k=50, top_p=0.9) with VllmRunner( - "Qwen/Qwen3-30B-A3B", - dtype="auto", - tensor_parallel_size=2, - distributed_executor_backend="mp", - enable_expert_parallel=True, - enforce_eager= - True, # TODO(Levi-JQ): support graph mode for fc2 in Qwen - additional_config={"layer_sharding": ["o_proj"]}) as vllm_model: + "Qwen/Qwen3-30B-A3B", + dtype="auto", + tensor_parallel_size=2, + distributed_executor_backend="mp", + enable_expert_parallel=True, + enforce_eager=True, # TODO(Levi-JQ): support graph mode for fc2 in Qwen + additional_config={"layer_sharding": ["o_proj"]}, + ) as vllm_model: vllm_model.generate(example_prompts, sampling_params) @@ -190,17 +187,16 @@ def test_deepseek_v2_lite_fc1_tp2() -> None: example_prompts = [ "test" * 1001, ] - sampling_params = SamplingParams(max_tokens=5, - temperature=0.0, - top_k=50, - top_p=0.9) - with VllmRunner("vllm-ascend/DeepSeek-V2-Lite-W8A8", - dtype="auto", - tensor_parallel_size=2, - distributed_executor_backend="mp", - enable_expert_parallel=True, - enforce_eager=True, - quantization="ascend") as vllm_model: + sampling_params = SamplingParams(max_tokens=5, temperature=0.0, top_k=50, top_p=0.9) + with VllmRunner( + "vllm-ascend/DeepSeek-V2-Lite-W8A8", + dtype="auto", + tensor_parallel_size=2, + distributed_executor_backend="mp", + enable_expert_parallel=True, + enforce_eager=True, + quantization="ascend", + ) as vllm_model: vllm_model.generate(example_prompts, sampling_params) @@ -213,12 +209,12 @@ def test_qwen3_dense_fc1_tp2(model): max_tokens = 5 with VllmRunner( - model, - max_model_len=8192, - dtype="auto", - tensor_parallel_size=2, - cudagraph_capture_sizes=[1, 2, 4, 8], - quantization="ascend", + model, + max_model_len=8192, + dtype="auto", + tensor_parallel_size=2, + cudagraph_capture_sizes=[1, 2, 4, 8], + quantization="ascend", ) as vllm_model: vllm_model.generate_greedy(example_prompts, max_tokens) @@ -232,13 +228,13 @@ def test_qwen3_dense_prefetch_mlp_weight_tp2(model): max_tokens = 5 with VllmRunner( - model, - max_model_len=8192, - dtype="auto", - tensor_parallel_size=2, - cudagraph_capture_sizes=[1, 2, 4, 8], - quantization="ascend", - additional_config={"weight_prefetch_config": {"enabled": True}}, + model, + max_model_len=8192, + dtype="auto", + tensor_parallel_size=2, + cudagraph_capture_sizes=[1, 2, 4, 8], + quantization="ascend", + additional_config={"weight_prefetch_config": {"enabled": True}}, ) as vllm_model: vllm_model.generate_greedy(example_prompts, max_tokens) @@ -252,28 +248,20 @@ def test_deepseek3_2_w8a8_pruning_mtp_tp2_ep(): "Hello ", ] # "max_position_embeddings": 163840, - long_example_prompts = [ - "Hello " * (163839 - 500) + "Hello" - ] + long_example_prompts = ["Hello " * (163839 - 500) + "Hello"] max_tokens = 500 - with VllmRunner("vllm-ascend/DeepSeek-V3.2-W8A8-Pruning", - tensor_parallel_size=2, - quantization="ascend", - enable_expert_parallel=True, - max_model_len=163840, - compilation_config={ - "cudagraph_capture_sizes": [2, 4, 6, 8, 10, 12], - "cudagraph_mode": "FULL_DECODE_ONLY" - }, - speculative_config={ - "num_speculative_tokens": 1, - "method": "deepseek_mtp" - }, - additional_config={ - "layer_sharding":["q_b_proj", "o_proj"] - }, - reasoning_parser="deepseek_v3", - tokenizer_mode="deepseek_v32") as vllm_model: + with VllmRunner( + "vllm-ascend/DeepSeek-V3.2-W8A8-Pruning", + tensor_parallel_size=2, + quantization="ascend", + enable_expert_parallel=True, + max_model_len=163840, + compilation_config={"cudagraph_capture_sizes": [2, 4, 6, 8, 10, 12], "cudagraph_mode": "FULL_DECODE_ONLY"}, + speculative_config={"num_speculative_tokens": 1, "method": "deepseek_mtp"}, + additional_config={"layer_sharding": ["q_b_proj", "o_proj"]}, + reasoning_parser="deepseek_v3", + tokenizer_mode="deepseek_v32", + ) as vllm_model: vllm_model.generate_greedy(short_example_prompts, max_tokens) vllm_model.generate_greedy(long_example_prompts, max_tokens) @@ -285,10 +273,10 @@ def test_qwen3_w4a4_distributed_tp2(model): ] max_tokens = 5 with VllmRunner( - model, - tensor_parallel_size=2, - cudagraph_capture_sizes=[1, 2, 4, 8], - quantization="ascend", + model, + tensor_parallel_size=2, + cudagraph_capture_sizes=[1, 2, 4, 8], + quantization="ascend", ) as vllm_model: vllm_model.generate_greedy(example_prompts, max_tokens) @@ -300,8 +288,8 @@ def test_gpt_oss_distributed_tp2(model): ] max_tokens = 5 with VllmRunner( - model, - tensor_parallel_size=2, - enforce_eager=True, + model, + tensor_parallel_size=2, + enforce_eager=True, ) as vllm_model: vllm_model.generate_greedy(example_prompts, max_tokens) diff --git a/tests/e2e/multicard/2-cards/test_offline_weight_load.py b/tests/e2e/multicard/2-cards/test_offline_weight_load.py index 6d6961b0..defc2963 100644 --- a/tests/e2e/multicard/2-cards/test_offline_weight_load.py +++ b/tests/e2e/multicard/2-cards/test_offline_weight_load.py @@ -32,9 +32,7 @@ MODELS = ["Qwen/Qwen3-30B-A3B"] @pytest.mark.parametrize("model", MODELS) @patch.dict(os.environ, {"VLLM_ASCEND_ENABLE_NZ": "0"}) def test_qwen3_offline_load_and_sleepmode_tp2(model): - script = Path( - __file__ - ).parent.parent.parent.parent.parent / "examples" / "offline_external_launcher.py" + script = Path(__file__).parent.parent.parent.parent.parent / "examples" / "offline_external_launcher.py" env = os.environ.copy() cmd = [ sys.executable, @@ -65,7 +63,7 @@ def test_qwen3_offline_load_and_sleepmode_tp2(model): stderr=subprocess.STDOUT, timeout=600, ) - output = proc.stdout.decode(errors='ignore') + output = proc.stdout.decode(errors="ignore") print(output) diff --git a/tests/e2e/multicard/2-cards/test_pipeline_parallel.py b/tests/e2e/multicard/2-cards/test_pipeline_parallel.py index c2dc2d90..4023ee0e 100644 --- a/tests/e2e/multicard/2-cards/test_pipeline_parallel.py +++ b/tests/e2e/multicard/2-cards/test_pipeline_parallel.py @@ -37,12 +37,13 @@ prompts = [ @pytest.mark.parametrize("tp_size", TENSOR_PARALLELS) @pytest.mark.parametrize("pp_size", PIPELINE_PARALLELS) @pytest.mark.parametrize("distributed_executor_backend", DIST_EXECUTOR_BACKEND) -def test_models_pp2(model: str, tp_size: int, pp_size: int, - distributed_executor_backend: str) -> None: - with VllmRunner(model, - tensor_parallel_size=tp_size, - pipeline_parallel_size=pp_size, - cudagraph_capture_sizes=[1, 2, 4, 8], - distributed_executor_backend=distributed_executor_backend, - gpu_memory_utilization=0.7) as vllm_model: +def test_models_pp2(model: str, tp_size: int, pp_size: int, distributed_executor_backend: str) -> None: + with VllmRunner( + model, + tensor_parallel_size=tp_size, + pipeline_parallel_size=pp_size, + cudagraph_capture_sizes=[1, 2, 4, 8], + distributed_executor_backend=distributed_executor_backend, + gpu_memory_utilization=0.7, + ) as vllm_model: vllm_model.generate_greedy(prompts, 64) diff --git a/tests/e2e/multicard/2-cards/test_prefix_caching.py b/tests/e2e/multicard/2-cards/test_prefix_caching.py index b96a75c0..6ef5ca59 100644 --- a/tests/e2e/multicard/2-cards/test_prefix_caching.py +++ b/tests/e2e/multicard/2-cards/test_prefix_caching.py @@ -11,11 +11,14 @@ MODELS = [ # for MHA "Qwen/Qwen3-8B", # for MLA - "deepseek-ai/DeepSeek-V2-Lite-Chat" + "deepseek-ai/DeepSeek-V2-Lite-Chat", ] # A prompt containing a large markdown table. The table is randomly generated by GPT-4. -LONG_PROMPT = "You are a helpful assistant in recognizes the content of tables in markdown format. Here is a table as follows.\n# Table\n" + """ +# ruff: noqa: E501 +LONG_PROMPT = ( + "You are a helpful assistant in recognizes the content of tables in markdown format. Here is a table as follows.\n# Table\n" + + """ | ID | Name | Age | Occupation | Country | Email | Phone Number | Address | |-----|---------------|-----|---------------|---------------|------------------------|----------------|------------------------------| | 1 | John Doe | 29 | Engineer | USA | john.doe@example.com | 555-1234 | 123 Elm St, Springfield, IL | @@ -49,32 +52,34 @@ LONG_PROMPT = "You are a helpful assistant in recognizes the content of tables i | 29 | Amy White | 33 | Musician | New Zealand | amy.w@example.com | 555-5658 | 159 Maple St, Wellington, NZ | | 30 | Ben Black | 38 | Chef | Ireland | ben.b@example.com | 555-7870 | 246 Fir St, Waterford, IE | """ +) INPUT_PROMPTS = [ - LONG_PROMPT + - "Question: what is the age of John Doe? Your answer: The age of John Doe is ", - LONG_PROMPT + - "Question: what is the age of Zack Blue? Your answer: The age of Zack Blue is " + LONG_PROMPT + "Question: what is the age of John Doe? Your answer: The age of John Doe is ", + LONG_PROMPT + "Question: what is the age of Zack Blue? Your answer: The age of Zack Blue is ", ] @pytest.mark.parametrize("model", MODELS) @pytest.mark.parametrize("max_tokens", [50]) def test_models_prefix_cache_tp2(model: str, max_tokens: int) -> None: - with VllmRunner(model, - max_model_len=2048, - tensor_parallel_size=2, - cudagraph_capture_sizes=[1, 2, 4, 8], - gpu_memory_utilization=0.7) as vllm_model: - prefix_cache_output = vllm_model.generate_greedy( - INPUT_PROMPTS, max_tokens) + with VllmRunner( + model, + max_model_len=2048, + tensor_parallel_size=2, + cudagraph_capture_sizes=[1, 2, 4, 8], + gpu_memory_utilization=0.7, + ) as vllm_model: + prefix_cache_output = vllm_model.generate_greedy(INPUT_PROMPTS, max_tokens) - with VllmRunner(model, - enable_prefix_caching=False, - max_model_len=2048, - tensor_parallel_size=2, - cudagraph_capture_sizes=[1, 2, 4, 8], - gpu_memory_utilization=0.7) as vllm_model: + with VllmRunner( + model, + enable_prefix_caching=False, + max_model_len=2048, + tensor_parallel_size=2, + cudagraph_capture_sizes=[1, 2, 4, 8], + gpu_memory_utilization=0.7, + ) as vllm_model: vllm_output = vllm_model.generate_greedy(INPUT_PROMPTS, max_tokens) check_outputs_equal( diff --git a/tests/e2e/multicard/2-cards/test_quantization.py b/tests/e2e/multicard/2-cards/test_quantization.py index da45628b..c3ae5f0b 100644 --- a/tests/e2e/multicard/2-cards/test_quantization.py +++ b/tests/e2e/multicard/2-cards/test_quantization.py @@ -16,7 +16,6 @@ # This file is a part of the vllm-ascend project. # Adapted from vllm/tests/basic_correctness/test_basic_correctness.py # -import pytest from tests.e2e.conftest import VllmRunner @@ -27,16 +26,16 @@ def test_qwen2_5_w8a8_external_quantized_tp2(): ] max_tokens = 5 with VllmRunner( - "neuralmagic/Qwen2.5-3B-quantized.w8a8", - tensor_parallel_size=2, - cudagraph_capture_sizes=[1, 2, 4, 8], - max_model_len=4096, - gpu_memory_utilization=0.8, + "neuralmagic/Qwen2.5-3B-quantized.w8a8", + tensor_parallel_size=2, + cudagraph_capture_sizes=[1, 2, 4, 8], + max_model_len=4096, + gpu_memory_utilization=0.8, ) as vllm_model: vllm_output = vllm_model.generate_greedy(example_prompts, max_tokens) golden_results = [ - 'The president of the United States is the head of state and', + "The president of the United States is the head of state and", ] for i in range(len(vllm_output)): @@ -50,36 +49,37 @@ def test_qwen3_moe_w8a8_dynamic_llm_compressor(): ] max_tokens = 5 with VllmRunner( - "vllm-ascend/Qwen3-30B-A3B-Instruct-2507-quantized.w8a8", - tensor_parallel_size=2, - max_model_len=4096, - gpu_memory_utilization=0.8, + "vllm-ascend/Qwen3-30B-A3B-Instruct-2507-quantized.w8a8", + tensor_parallel_size=2, + max_model_len=4096, + gpu_memory_utilization=0.8, ) as vllm_model: vllm_output = vllm_model.generate_greedy(example_prompts, max_tokens) golden_results = [ - 'The president of the United States is the head of state and', + "The president of the United States is the head of state and", ] for i in range(len(vllm_output)): assert golden_results[i] == vllm_output[i][1] print(f"Generated text: {vllm_output[i][1]!r}") + def test_qwen3_moe_w4a8_dynamic_llm_compressor(): example_prompts = [ "The president of the United States is", ] max_tokens = 5 with VllmRunner( - "vllm-ascend/Qwen3-30B-A3B-Instruct-2507-quantized.w4a8", - tensor_parallel_size=2, - max_model_len=4096, - gpu_memory_utilization=0.8, + "vllm-ascend/Qwen3-30B-A3B-Instruct-2507-quantized.w4a8", + tensor_parallel_size=2, + max_model_len=4096, + gpu_memory_utilization=0.8, ) as vllm_model: vllm_output = vllm_model.generate_greedy(example_prompts, max_tokens) golden_results = [ - 'The president of the United States is the head of state and', + "The president of the United States is the head of state and", ] for i in range(len(vllm_output)): diff --git a/tests/e2e/multicard/2-cards/test_qwen3_moe.py b/tests/e2e/multicard/2-cards/test_qwen3_moe.py index 7a8cf77d..385b32e8 100644 --- a/tests/e2e/multicard/2-cards/test_qwen3_moe.py +++ b/tests/e2e/multicard/2-cards/test_qwen3_moe.py @@ -34,11 +34,11 @@ def test_qwen3_moe_distributed_mp_tp2_ep(): ] max_tokens = 5 with VllmRunner( - "Qwen/Qwen3-30B-A3B", - tensor_parallel_size=2, - enable_expert_parallel=True, - cudagraph_capture_sizes=[1, 2, 4, 8], - distributed_executor_backend="mp", + "Qwen/Qwen3-30B-A3B", + tensor_parallel_size=2, + enable_expert_parallel=True, + cudagraph_capture_sizes=[1, 2, 4, 8], + distributed_executor_backend="mp", ) as vllm_model: vllm_model.generate_greedy(example_prompts, max_tokens) @@ -49,27 +49,27 @@ def test_qwen3_moe_w8a8_distributed_tp2(): ] max_tokens = 5 with VllmRunner( - "vllm-ascend/Qwen3-30B-A3B-W8A8", - max_model_len=8192, - tensor_parallel_size=2, - cudagraph_capture_sizes=[1, 2, 4, 8], - quantization="ascend", + "vllm-ascend/Qwen3-30B-A3B-W8A8", + max_model_len=8192, + tensor_parallel_size=2, + cudagraph_capture_sizes=[1, 2, 4, 8], + quantization="ascend", ) as vllm_model: vllm_model.generate_greedy(example_prompts, max_tokens) def test_qwen3_moe_distributed_aiv_tp2(): - os.environ['HCCL_OP_EXPANSION_MODE'] = 'AIV' + os.environ["HCCL_OP_EXPANSION_MODE"] = "AIV" example_prompts = [ "Hello, my name is", ] dtype = "auto" max_tokens = 5 with VllmRunner( - "Qwen/Qwen3-30B-A3B", - dtype=dtype, - tensor_parallel_size=2, - cudagraph_capture_sizes=[1, 2, 4, 8], + "Qwen/Qwen3-30B-A3B", + dtype=dtype, + tensor_parallel_size=2, + cudagraph_capture_sizes=[1, 2, 4, 8], ) as vllm_model: vllm_model.generate_greedy(example_prompts, max_tokens) @@ -80,23 +80,24 @@ async def test_qwen3_moe_w8a8_distributed_tp2_ep_dynamic_eplb(): port = get_open_port() compilation_config = json.dumps({"cudagraph_capture_sizes": [8]}) server_args = [ - "--max_model_len", "8192", "--tensor_parallel_size", "2", - "--enable_expert_parallel", "--quantization", "ascend", "--port", - str(port), "--compilation-config", compilation_config + "--max_model_len", + "8192", + "--tensor_parallel_size", + "2", + "--enable_expert_parallel", + "--quantization", + "ascend", + "--port", + str(port), + "--compilation-config", + compilation_config, ] env_dict = {"HCCL_BUFFSIZE": "1024"} - with RemoteOpenAIServer(model, - server_args, - server_port=port, - auto_port=False, - env_dict=env_dict) as server: + with RemoteOpenAIServer(model, server_args, server_port=port, auto_port=False, env_dict=env_dict) as server: client = server.get_async_client() - batch = await client.completions.create(model=model, - prompt="What is deeplearning?", - max_tokens=400, - temperature=0, - top_p=1.0, - n=1) + batch = await client.completions.create( + model=model, prompt="What is deeplearning?", max_tokens=400, temperature=0, top_p=1.0, n=1 + ) gt_choices: list[openai.types.CompletionChoice] = batch.choices # dynamic eplb test @@ -108,22 +109,14 @@ async def test_qwen3_moe_w8a8_distributed_tp2_ep_dynamic_eplb(): "dynamic_eplb": True, "expert_heat_collection_interval": 100, "algorithm_execution_interval": 20, - "num_redundant_experts": 2 + "num_redundant_experts": 2, } } server_args.extend(["--additional-config", json.dumps(additional_config)]) - with RemoteOpenAIServer(model, - server_args, - server_port=port, - auto_port=False, - env_dict=env_dict) as server: + with RemoteOpenAIServer(model, server_args, server_port=port, auto_port=False, env_dict=env_dict) as server: client = server.get_async_client() - batch = await client.completions.create(model=model, - prompt="What is deeplearning?", - max_tokens=400, - temperature=0, - top_p=1.0, - n=1) + batch = await client.completions.create( + model=model, prompt="What is deeplearning?", max_tokens=400, temperature=0, top_p=1.0, n=1 + ) eplb_choices: list[openai.types.CompletionChoice] = batch.choices - assert gt_choices[0].text == eplb_choices[ - 0].text, f"{gt_choices[0].text=} \n {eplb_choices[0].text=}" + assert gt_choices[0].text == eplb_choices[0].text, f"{gt_choices[0].text=} \n {eplb_choices[0].text=}" diff --git a/tests/e2e/multicard/2-cards/test_qwen3_moe_routing_replay.py b/tests/e2e/multicard/2-cards/test_qwen3_moe_routing_replay.py index 0876eb57..97f90698 100644 --- a/tests/e2e/multicard/2-cards/test_qwen3_moe_routing_replay.py +++ b/tests/e2e/multicard/2-cards/test_qwen3_moe_routing_replay.py @@ -1,10 +1,11 @@ import os from unittest.mock import patch -from tests.e2e.conftest import VllmRunner from vllm import SamplingParams from vllm.sampling_params import RequestOutputKind +from tests.e2e.conftest import VllmRunner + @patch.dict(os.environ, {"OMP_NUM_THREADS": "1"}) def test_qwen3_moe_routing_replay(): @@ -12,18 +13,15 @@ def test_qwen3_moe_routing_replay(): "Hello, please introduce yourself.", ] with VllmRunner( - "Qwen/Qwen3-30B-A3B", - tensor_parallel_size=2, - enable_expert_parallel=True, - cudagraph_capture_sizes=[1, 2, 4, 8], - distributed_executor_backend="mp", - enable_return_routed_experts=True, + "Qwen/Qwen3-30B-A3B", + tensor_parallel_size=2, + enable_expert_parallel=True, + cudagraph_capture_sizes=[1, 2, 4, 8], + distributed_executor_backend="mp", + enable_return_routed_experts=True, ) as vllm_model: sampling_params = SamplingParams( - max_tokens=5, - temperature=0.8, - top_p=0.95, - output_kind=RequestOutputKind.FINAL_ONLY + max_tokens=5, temperature=0.8, top_p=0.95, output_kind=RequestOutputKind.FINAL_ONLY ) inputs = vllm_model.get_inputs(prompts=prompts) outputs = vllm_model.model.generate(prompts=inputs, sampling_params=sampling_params) diff --git a/tests/e2e/multicard/2-cards/test_qwen3_performance.py b/tests/e2e/multicard/2-cards/test_qwen3_performance.py index c6e6378e..7c3b409f 100644 --- a/tests/e2e/multicard/2-cards/test_qwen3_performance.py +++ b/tests/e2e/multicard/2-cards/test_qwen3_performance.py @@ -84,11 +84,7 @@ async def test_models(model: str) -> None: request_keyword_args: dict[str, Any] = { **api_keyword_args, } - with RemoteOpenAIServer(model, - server_args, - server_port=port, - env_dict=env_dict, - auto_port=False) as server: + with RemoteOpenAIServer(model, server_args, server_port=port, env_dict=env_dict, auto_port=False) as server: client = server.get_async_client() batch = await client.completions.create( model=model, diff --git a/tests/e2e/multicard/2-cards/test_shared_expert_dp.py b/tests/e2e/multicard/2-cards/test_shared_expert_dp.py index d7a8984f..c60e2f37 100644 --- a/tests/e2e/multicard/2-cards/test_shared_expert_dp.py +++ b/tests/e2e/multicard/2-cards/test_shared_expert_dp.py @@ -13,69 +13,65 @@ MODELS = [ @pytest.mark.parametrize("model", MODELS) def test_deepseek_v2_lite_enable_shared_expert_dp_tp2(model: str) -> None: - - if 'HCCL_OP_EXPANSION_MODE' in os.environ: - del os.environ['HCCL_OP_EXPANSION_MODE'] + if "HCCL_OP_EXPANSION_MODE" in os.environ: + del os.environ["HCCL_OP_EXPANSION_MODE"] prompts = [ - "Hello, my name is", "The capital of the United States is", - "The capital of France is", "The future of AI is" + "Hello, my name is", + "The capital of the United States is", + "The capital of France is", + "The future of AI is", ] sampling_params = SamplingParams(max_tokens=32, temperature=0.0) with VllmRunner( - model, - max_model_len=1024, - enforce_eager=True, - tensor_parallel_size=2, - enable_expert_parallel=True, + model, + max_model_len=1024, + enforce_eager=True, + tensor_parallel_size=2, + enable_expert_parallel=True, ) as runner: vllm_eager_outputs = runner.model.generate(prompts, sampling_params) os.environ["VLLM_ASCEND_ENABLE_FLASHCOMM1"] = "1" with VllmRunner( - model, - max_model_len=1024, - enforce_eager=True, - tensor_parallel_size=2, - enable_expert_parallel=True, - additional_config={ - "enable_shared_expert_dp": True, - }, + model, + max_model_len=1024, + enforce_eager=True, + tensor_parallel_size=2, + enable_expert_parallel=True, + additional_config={ + "enable_shared_expert_dp": True, + }, ) as runner: - shared_expert_dp_eager_outputs = runner.model.generate( - prompts, sampling_params) + shared_expert_dp_eager_outputs = runner.model.generate(prompts, sampling_params) with VllmRunner( - model, - max_model_len=1024, - tensor_parallel_size=2, - enable_expert_parallel=True, - compilation_config={ - "cudagraph_capture_sizes": [1, 4, 8, 16], - "cudagraph_mode": "FULL_DECODE_ONLY", - }, - additional_config={ - "enable_shared_expert_dp": True, - }, + model, + max_model_len=1024, + tensor_parallel_size=2, + enable_expert_parallel=True, + compilation_config={ + "cudagraph_capture_sizes": [1, 4, 8, 16], + "cudagraph_mode": "FULL_DECODE_ONLY", + }, + additional_config={ + "enable_shared_expert_dp": True, + }, ) as runner: - shared_expert_dp_aclgraph_outputs = runner.model.generate( - prompts, sampling_params) + shared_expert_dp_aclgraph_outputs = runner.model.generate(prompts, sampling_params) vllm_eager_outputs_list = [] for output in vllm_eager_outputs: - vllm_eager_outputs_list.append( - (output.outputs[0].index, output.outputs[0].text)) + vllm_eager_outputs_list.append((output.outputs[0].index, output.outputs[0].text)) shared_expert_dp_eager_outputs_list = [] for output in shared_expert_dp_eager_outputs: - shared_expert_dp_eager_outputs_list.append( - (output.outputs[0].index, output.outputs[0].text)) + shared_expert_dp_eager_outputs_list.append((output.outputs[0].index, output.outputs[0].text)) shared_expert_dp_aclgraph_outputs_list = [] for output in shared_expert_dp_aclgraph_outputs: - shared_expert_dp_aclgraph_outputs_list.append( - (output.outputs[0].index, output.outputs[0].text)) + shared_expert_dp_aclgraph_outputs_list.append((output.outputs[0].index, output.outputs[0].text)) check_outputs_equal( outputs_0_lst=vllm_eager_outputs_list, diff --git a/tests/e2e/multicard/2-cards/test_single_request_aclgraph.py b/tests/e2e/multicard/2-cards/test_single_request_aclgraph.py index 90b5d7e5..78dded63 100644 --- a/tests/e2e/multicard/2-cards/test_single_request_aclgraph.py +++ b/tests/e2e/multicard/2-cards/test_single_request_aclgraph.py @@ -39,8 +39,7 @@ api_keyword_args = { @pytest.mark.asyncio @pytest.mark.parametrize("model", MODELS) @pytest.mark.parametrize("dp_size", DATA_PARALLELS) -async def test_models_single_request_aclgraph_dp2(model: str, - dp_size: int) -> None: +async def test_models_single_request_aclgraph_dp2(model: str, dp_size: int) -> None: port = get_open_port() env_dict = { "TASK_QUEUE_ENABLE": "1", @@ -48,36 +47,51 @@ async def test_models_single_request_aclgraph_dp2(model: str, } if model == "vllm-ascend/DeepSeek-V2-Lite-W8A8": server_args = [ - "--no-enable-prefix-caching", "--tensor-parallel-size", "1", + "--no-enable-prefix-caching", + "--tensor-parallel-size", + "1", "--data-parallel-size", - str(dp_size), "--quantization", "ascend", "--max-model-len", - "1024", "--port", - str(port), "--trust-remote-code", "--gpu-memory-utilization", "0.9" + str(dp_size), + "--quantization", + "ascend", + "--max-model-len", + "1024", + "--port", + str(port), + "--trust-remote-code", + "--gpu-memory-utilization", + "0.9", ] else: server_args = [ - "--no-enable-prefix-caching", "--tensor-parallel-size", "1", + "--no-enable-prefix-caching", + "--tensor-parallel-size", + "1", "--data-parallel-size", - str(dp_size), "--port", - str(port), "--trust-remote-code", "--gpu-memory-utilization", "0.9" + str(dp_size), + "--port", + str(port), + "--trust-remote-code", + "--gpu-memory-utilization", + "0.9", ] request_keyword_args: dict[str, Any] = { **api_keyword_args, } - with RemoteOpenAIServer(model, - vllm_serve_args=server_args, - server_port=port, - env_dict=env_dict, - auto_port=False) as server: + with RemoteOpenAIServer( + model, vllm_serve_args=server_args, server_port=port, env_dict=env_dict, auto_port=False + ) as server: client = server.get_async_client() try: - batch = await asyncio.wait_for(client.completions.create( - model=model, - prompt=prompts, - **request_keyword_args, - ), - timeout=10.0) + batch = await asyncio.wait_for( + client.completions.create( + model=model, + prompt=prompts, + **request_keyword_args, + ), + timeout=10.0, + ) except asyncio.TimeoutError: pytest.fail("Model did not return response within 10 seconds") diff --git a/tests/e2e/multicard/2-cards/test_sp_pass.py b/tests/e2e/multicard/2-cards/test_sp_pass.py index c3e99dfc..88d0c885 100644 --- a/tests/e2e/multicard/2-cards/test_sp_pass.py +++ b/tests/e2e/multicard/2-cards/test_sp_pass.py @@ -1,5 +1,3 @@ -import os - import pytest from vllm import SamplingParams @@ -14,47 +12,46 @@ MODELS = [ @pytest.mark.parametrize("model", MODELS) def test_qwen3_vl_sp_tp2(model: str) -> None: prompts = [ - "Hello, my name is", "The capital of the United States is", - "The capital of France is", "The future of AI is" + "Hello, my name is", + "The capital of the United States is", + "The capital of France is", + "The future of AI is", ] sampling_params = SamplingParams(max_tokens=10, temperature=0.0) with VllmRunner( - model, - max_model_len=1024, - tensor_parallel_size=2, - compilation_config={ - "cudagraph_capture_sizes": [2, 4], - "cudagraph_mode": "FULL_DECODE_ONLY", - "pass_config": {"enable_sp": False} - }, - additional_config={"ascend_compilation_config": {"enable_npugraph_ex": False}} + model, + max_model_len=1024, + tensor_parallel_size=2, + compilation_config={ + "cudagraph_capture_sizes": [2, 4], + "cudagraph_mode": "FULL_DECODE_ONLY", + "pass_config": {"enable_sp": False}, + }, + additional_config={"ascend_compilation_config": {"enable_npugraph_ex": False}}, ) as runner: no_sp_outputs = runner.model.generate(prompts, sampling_params) with VllmRunner( - model, - max_model_len=1024, - tensor_parallel_size=2, - compilation_config={ - "cudagraph_capture_sizes": [2, 4], - "cudagraph_mode": "FULL_DECODE_ONLY", - "pass_config": {"enable_sp": True} - }, - additional_config={"sp_threshold": 10, "ascend_compilation_config": {"enable_npugraph_ex": False}} + model, + max_model_len=1024, + tensor_parallel_size=2, + compilation_config={ + "cudagraph_capture_sizes": [2, 4], + "cudagraph_mode": "FULL_DECODE_ONLY", + "pass_config": {"enable_sp": True}, + }, + additional_config={"sp_threshold": 10, "ascend_compilation_config": {"enable_npugraph_ex": False}}, ) as runner: - sp_outputs = runner.model.generate( - prompts, sampling_params) + sp_outputs = runner.model.generate(prompts, sampling_params) no_sp_outputs_list = [] for output in no_sp_outputs: - no_sp_outputs_list.append( - (output.outputs[0].index, output.outputs[0].text)) + no_sp_outputs_list.append((output.outputs[0].index, output.outputs[0].text)) sp_outputs_list = [] for output in sp_outputs: - sp_outputs_list.append( - (output.outputs[0].index, output.outputs[0].text)) + sp_outputs_list.append((output.outputs[0].index, output.outputs[0].text)) check_outputs_equal( outputs_0_lst=no_sp_outputs_list,