From f10acddb78d5ba24ef29e18d8663e6d9a68583f3 Mon Sep 17 00:00:00 2001 From: wangxiyuan Date: Sat, 29 Nov 2025 16:18:34 +0800 Subject: [PATCH] drop ascend scheduler (#4498) Ascend scheduler was added for non chunk prefill case before, since that the npu ops didn't work well with chunked prefill. Now the ops with chunked prefill work better, it's time to remove the ascend scheduler to use vLLM default scheduler. - vLLM version: v0.11.2 --------- Signed-off-by: wangxiyuan --- .github/workflows/_e2e_test.yaml | 2 - docs/source/tutorials/DeepSeek-V3.2-Exp.md | 10 +- docs/source/tutorials/multi_node.md | 4 +- docs/source/tutorials/multi_node_kimi.md | 4 +- docs/source/tutorials/multi_npu_moge.md | 5 - .../configuration/additional_config.md | 19 - .../user_guide/feature_guide/graph_mode.md | 4 +- examples/offline_inference_npu_long_seq.py | 1 - examples/run_dp_server.sh | 2 +- .../test_offline_inference_parallel_310p.py | 5 +- tests/e2e/multicard/test_expert_parallel.py | 21 +- .../multicard/test_fused_moe_allgather_ep.py | 16 +- .../test_offline_inference_distributed.py | 14 +- tests/e2e/multicard/test_prefix_caching.py | 66 +- tests/e2e/multicard/test_qwen3_next.py | 9 +- .../e2e/multicard/test_torchair_graph_mode.py | 12 - .../test_mtpx_deepseek_r1_0528_w8a8.py | 6 +- ...test_prefix_cache_deepseek_r1_0528_w8a8.py | 3 - .../test_prefix_cache_qwen3_32b_int8.py | 7 +- .../test_qwen3_32b_int8_a3_feature_stack3.py | 3 +- .../models/test_deepseek_r1_0528_w8a8.py | 8 - .../models/test_deepseek_r1_w8a8_eplb.py | 3 - .../models/test_deepseek_v3_2_exp_w8a8.py | 1 - .../e2e/nightly/models/test_qwen2_5_vl_32b.py | 5 +- .../models/test_qwen3_235b_a22b_w8a8_eplb.py | 6 +- .../nightly/models/test_qwen3_235b_w8a8.py | 6 - tests/e2e/nightly/models/test_qwq_32b.py | 2 - .../models/DeepSeek-R1-W8A8-A2-torchair.yaml | 4 +- .../config/models/DeepSeek-R1-W8A8-A2.yaml | 4 +- .../config/models/DeepSeek-R1-W8A8-EPLB.yaml | 8 +- .../config/models/DeepSeek-R1-W8A8.yaml | 8 +- .../config/models/DeepSeek-V3_2-Exp-bf16.yaml | 4 +- .../spec_decode_v1/test_v1_mtp_correctness.py | 41 +- tests/e2e/singlecard/test_ascend_scheduler.py | 170 -- tests/e2e/singlecard/test_chunked.py | 82 - tests/e2e/singlecard/test_vlm.py | 35 - tests/ut/core/test_schedule_config.py | 134 -- tests/ut/core/test_scheduler.py | 1473 ----------------- tests/ut/ops/test_linear.py | 1 - tests/ut/ops/test_vocab_parallel_embedding.py | 7 +- tests/ut/quantization/test_w8a8_dynamic.py | 7 - tests/ut/test_ascend_config.py | 9 - tests/ut/test_platform.py | 26 - tests/ut/test_utils.py | 4 +- .../models/test_torchair_deepseek_v2.py | 2 - vllm_ascend/ascend_config.py | 19 - vllm_ascend/core/schedule_config.py | 105 -- vllm_ascend/core/scheduler.py | 592 ------- vllm_ascend/platform.py | 37 +- vllm_ascend/profiling_config.py | 5 - vllm_ascend/torchair/torchair_attention.py | 5 +- vllm_ascend/worker/model_runner_v1.py | 7 +- 52 files changed, 85 insertions(+), 2948 deletions(-) delete mode 100644 tests/e2e/singlecard/test_ascend_scheduler.py delete mode 100644 tests/e2e/singlecard/test_chunked.py delete mode 100644 tests/ut/core/test_schedule_config.py delete mode 100644 tests/ut/core/test_scheduler.py delete mode 100644 vllm_ascend/core/schedule_config.py delete mode 100644 vllm_ascend/core/scheduler.py diff --git a/.github/workflows/_e2e_test.yaml b/.github/workflows/_e2e_test.yaml index 843a53a6..dea87b1b 100644 --- a/.github/workflows/_e2e_test.yaml +++ b/.github/workflows/_e2e_test.yaml @@ -91,10 +91,8 @@ jobs: pytest -sv tests/e2e/singlecard/test_completion_with_prompt_embeds.py pytest -sv tests/e2e/singlecard/test_aclgraph.py pytest -sv tests/e2e/singlecard/test_aclgraph_mem.py - pytest -sv tests/e2e/singlecard/test_ascend_scheduler.py pytest -sv tests/e2e/singlecard/test_bge_model.py pytest -sv tests/e2e/singlecard/test_camem.py - pytest -sv tests/e2e/singlecard/test_chunked.py pytest -sv tests/e2e/singlecard/test_embedding.py # pytest -sv tests/e2e/singlecard/test_embedding_aclgraph.py pytest -sv tests/e2e/singlecard/test_guided_decoding.py diff --git a/docs/source/tutorials/DeepSeek-V3.2-Exp.md b/docs/source/tutorials/DeepSeek-V3.2-Exp.md index f00f8b40..73bc3dc9 100644 --- a/docs/source/tutorials/DeepSeek-V3.2-Exp.md +++ b/docs/source/tutorials/DeepSeek-V3.2-Exp.md @@ -108,7 +108,7 @@ vllm serve vllm-ascend/DeepSeek-V3.2-Exp-W8A8 \ --trust-remote-code \ --no-enable-prefix-caching \ --gpu-memory-utilization 0.92 \ ---additional-config '{"ascend_scheduler_config":{"enabled":true},"torchair_graph_config":{"enabled":true,"graph_batch_sizes":[16]}}' +--additional-config '{"torchair_graph_config":{"enabled":true,"graph_batch_sizes":[16]}}' ``` ### Multi-node Deployment @@ -160,7 +160,7 @@ vllm serve /root/.cache/Modelers_Park/DeepSeek-V3.2-Exp \ --trust-remote-code \ --no-enable-prefix-caching \ --gpu-memory-utilization 0.9 \ ---additional-config '{"ascend_scheduler_config":{"enabled":true},"torchair_graph_config":{"enabled":true,"graph_batch_sizes":[16]}}' +--additional-config '{"torchair_graph_config":{"enabled":true,"graph_batch_sizes":[16]}}' ``` **Node 1** @@ -204,7 +204,7 @@ vllm serve /root/.cache/Modelers_Park/DeepSeek-V3.2-Exp \ --trust-remote-code \ --no-enable-prefix-caching \ --gpu-memory-utilization 0.92 \ ---additional-config '{"ascend_scheduler_config":{"enabled":true},"torchair_graph_config":{"enabled":true,"graph_batch_sizes":[16]}}' +--additional-config '{"torchair_graph_config":{"enabled":true,"graph_batch_sizes":[16]}}' ``` :::: @@ -252,7 +252,7 @@ vllm serve vllm-ascend/DeepSeek-V3.2-Exp-W8A8 \ --quantization ascend \ --no-enable-prefix-caching \ --gpu-memory-utilization 0.9 \ ---additional-config '{"ascend_scheduler_config":{"enabled":true},"torchair_graph_config":{"enabled":true,"graph_batch_sizes":[16]}}' +--additional-config '{"torchair_graph_config":{"enabled":true,"graph_batch_sizes":[16]}}' ``` **Node 1** @@ -299,7 +299,7 @@ vllm serve vllm-ascend/DeepSeek-V3.2-Exp-W8A8 \ --quantization ascend \ --no-enable-prefix-caching \ --gpu-memory-utilization 0.92 \ ---additional-config '{"ascend_scheduler_config":{"enabled":true},"torchair_graph_config":{"enabled":true,"graph_batch_sizes":[16]}}' +--additional-config '{"torchair_graph_config":{"enabled":true,"graph_batch_sizes":[16]}}' ``` :::: diff --git a/docs/source/tutorials/multi_node.md b/docs/source/tutorials/multi_node.md index 68c7056b..d04fa090 100644 --- a/docs/source/tutorials/multi_node.md +++ b/docs/source/tutorials/multi_node.md @@ -137,7 +137,7 @@ vllm serve vllm-ascend/DeepSeek-V3.1-W8A8 \ --trust-remote-code \ --no-enable-prefix-caching \ --gpu-memory-utilization 0.9 \ ---additional-config '{"ascend_scheduler_config":{"enabled":true},"torchair_graph_config":{"enabled":true}}' +--additional-config '{"torchair_graph_config":{"enabled":true}}' ``` **Node 1** @@ -182,7 +182,7 @@ vllm serve vllm-ascend/DeepSeek-V3.1-W8A8 \ --trust-remote-code \ --no-enable-prefix-caching \ --gpu-memory-utilization 0.92 \ ---additional-config '{"ascend_scheduler_config":{"enabled":true},"torchair_graph_config":{"enabled":true}}' +--additional-config '{"torchair_graph_config":{"enabled":true}}' ``` The deployment view looks like: diff --git a/docs/source/tutorials/multi_node_kimi.md b/docs/source/tutorials/multi_node_kimi.md index cb28bca9..84840cdf 100644 --- a/docs/source/tutorials/multi_node_kimi.md +++ b/docs/source/tutorials/multi_node_kimi.md @@ -93,7 +93,7 @@ vllm serve /home/cache/weights/Kimi-K2-Instruct-W8A8 \ --trust-remote-code \ --no-enable-prefix-caching \ --gpu-memory-utilization 0.9 \ ---additional-config '{"ascend_scheduler_config":{"enabled":true},"torchair_graph_config":{"enabled":true}}' +--additional-config '{"torchair_graph_config":{"enabled":true}}' ``` **Node 1** @@ -137,7 +137,7 @@ vllm serve /home/cache/weights/Kimi-K2-Instruct-W8A8 \ --trust-remote-code \ --no-enable-prefix-caching \ --gpu-memory-utilization 0.92 \ ---additional-config '{"ascend_scheduler_config":{"enabled":true},"torchair_graph_config":{"enabled":true}}' +--additional-config '{"torchair_graph_config":{"enabled":true}}' ``` The deployment view looks like: diff --git a/docs/source/tutorials/multi_npu_moge.md b/docs/source/tutorials/multi_npu_moge.md index e426c0f3..8a2cf007 100644 --- a/docs/source/tutorials/multi_npu_moge.md +++ b/docs/source/tutorials/multi_npu_moge.md @@ -158,11 +158,6 @@ if __name__ == "__main__": 'torchair_graph_config': { 'enabled': True, }, - 'ascend_scheduler_config':{ - 'enabled': True, - 'enable_chunked_prefill' : False, - 'chunked_prefill_enabled': False - }, }) outputs = llm.generate(prompts, sampling_params) diff --git a/docs/source/user_guide/configuration/additional_config.md b/docs/source/user_guide/configuration/additional_config.md index 448f2ec4..a77d0d53 100644 --- a/docs/source/user_guide/configuration/additional_config.md +++ b/docs/source/user_guide/configuration/additional_config.md @@ -27,7 +27,6 @@ The following table lists additional configuration options available in vLLM Asc | Name | Type | Default | Description | |-------------------------------------|------|---------|-----------------------------------------------------------------------------------------------------------------------------------------------| | `torchair_graph_config` | dict | `{}` | Configuration options for torchair graph mode | -| `ascend_scheduler_config` | dict | `{}` | Configuration options for ascend scheduler | | `weight_prefetch_config` | dict | `{}` | Configuration options for weight prefetch | | `refresh` | bool | `false` | Whether to refresh global Ascend configuration content. This is usually used by rlhf or ut/e2e test case. | | `expert_map_path` | str | `None` | When using expert load balancing for an MoE model, an expert map path needs to be passed in. | @@ -61,18 +60,6 @@ The details of each configuration option are as follows: | `enable_kv_nz`| bool | `False` | Whether to enable KV Cache NZ layout. This option only takes effect on models using MLA (for example, DeepSeek). | | `enable_super_kernel` | bool | `False` | Whether to enable super kernel to fuse operators in deepseek moe layers. This option only takes effects on moe models using dynamic w8a8 quantization.| -**ascend_scheduler_config** - -| Name | Type | Default | Description | -| ---- | ---- | ------- | ----------- | -| `enabled` | bool | `False` | Whether to enable ascend scheduler for V1 engine.| -| `enable_pd_transfer` | bool | `False` | Whether to enable P-D transfer. When it is enabled, decode is started only when prefill of all requests is done. This option only takes effect on offline inference. | -| `decode_max_num_seqs` | int | `0` | Whether to change max_num_seqs of decode phase when P-D transfer is enabled. This option only takes effect when enable_pd_transfer is True. | -| `max_long_partial_prefills` | Union[int, float] | `float('inf')` | The maximum number of prompts longer than long_prefill_token_threshold that will be prefilled concurrently. | -| `long_prefill_token_threshold` | Union[int, float] | `float('inf')` | a request is considered long if the prompt is longer than this number of tokens. | - -ascend_scheduler_config also supports the options from [vllm scheduler config](https://docs.vllm.ai/en/stable/api/vllm/config.html#vllm.config.SchedulerConfig). For example, you can add `enable_chunked_prefill: True` to ascend_scheduler_config as well. - **weight_prefetch_config** | Name | Type | Default | Description | @@ -93,12 +80,6 @@ An example of additional configuration is as follows: "graph_batch_sizes_init": False, "enable_kv_nz": False }, - "ascend_scheduler_config": { - "enabled": True, - "enable_chunked_prefill": True, - "max_long_partial_prefills": 1, - "long_prefill_token_threshold": 4096, - }, "weight_prefetch_config": { "enabled": True, "prefetch_ratio": { diff --git a/docs/source/user_guide/feature_guide/graph_mode.md b/docs/source/user_guide/feature_guide/graph_mode.md index 43236289..9afa1d52 100644 --- a/docs/source/user_guide/feature_guide/graph_mode.md +++ b/docs/source/user_guide/feature_guide/graph_mode.md @@ -45,14 +45,14 @@ import os from vllm import LLM # TorchAirGraph only works without chunked-prefill now -model = LLM(model="path/to/DeepSeek-R1-0528", additional_config={"torchair_graph_config": {"enabled": True},"ascend_scheduler_config": {"enabled": True}}) +model = LLM(model="path/to/DeepSeek-R1-0528", additional_config={"torchair_graph_config": {"enabled": True}}) outputs = model.generate("Hello, how are you?") ``` Online example: ```shell -vllm serve path/to/DeepSeek-R1-0528 --additional-config='{"torchair_graph_config": {"enabled": true},"ascend_scheduler_config": {"enabled": true}}' +vllm serve path/to/DeepSeek-R1-0528 --additional-config='{"torchair_graph_config": {"enabled": true}}' ``` You can find more details about additional configuration [here](../configuration/additional_config.md). diff --git a/examples/offline_inference_npu_long_seq.py b/examples/offline_inference_npu_long_seq.py index 2ed96f63..7e3afa01 100644 --- a/examples/offline_inference_npu_long_seq.py +++ b/examples/offline_inference_npu_long_seq.py @@ -42,7 +42,6 @@ if __name__ == "__main__": enable_chunked_prefill=False, max_num_batched_tokens=2048, max_model_len=1024, - additional_config={"ascend_scheduler_config": {"enabled": False}}, max_num_seqs=1, block_size=128, gpu_memory_utilization=0.9 diff --git a/examples/run_dp_server.sh b/examples/run_dp_server.sh index 9b9868c4..ec0cb686 100644 --- a/examples/run_dp_server.sh +++ b/examples/run_dp_server.sh @@ -28,4 +28,4 @@ vllm serve Qwen/Qwen1.5-MoE-A2.7B \ --gpu-memory-utilization 0.9 \ --trust-remote-code \ --enforce-eager \ - --additional-config '{"ascend_scheduler_config":{"enabled":true},"torchair_graph_config":{"enabled":false, "use_cached_graph":false}}' + --additional-config '{"torchair_graph_config":{"enabled":false, "use_cached_graph":false}}' diff --git a/tests/e2e/310p/test_offline_inference_parallel_310p.py b/tests/e2e/310p/test_offline_inference_parallel_310p.py index 6bf33568..bb973973 100644 --- a/tests/e2e/310p/test_offline_inference_parallel_310p.py +++ b/tests/e2e/310p/test_offline_inference_parallel_310p.py @@ -24,15 +24,12 @@ from tests.e2e.conftest import VllmRunner MODELS = [ "IntervitensInc/pangu-pro-moe-model", ] -# set additional config for ascend scheduler and torchair graph +# set additional config for torchair graph ADDITIONAL_CONFIG = [{ "additional_config": { "torchair_graph_config": { "enabled": True }, - "ascend_scheduler_config": { - "enabled": True, - } } }] diff --git a/tests/e2e/multicard/test_expert_parallel.py b/tests/e2e/multicard/test_expert_parallel.py index f1076013..b8f03d5f 100644 --- a/tests/e2e/multicard/test_expert_parallel.py +++ b/tests/e2e/multicard/test_expert_parallel.py @@ -15,23 +15,14 @@ def test_e2e_ep_correctness(model_name): max_tokens = 5 # FIXME: Really strange that chunked prefill might lead to different results, investigate further - with VllmRunner( - model_name, - tensor_parallel_size=2, - additional_config={"ascend_scheduler_config": { - "enabled": True - }}, - enforce_eager=False) as vllm_model: + with VllmRunner(model_name, tensor_parallel_size=2, + enforce_eager=False) as vllm_model: tp_output = vllm_model.generate_greedy(example_prompts, max_tokens) - with VllmRunner( - model_name, - tensor_parallel_size=2, - enable_expert_parallel=True, - additional_config={"ascend_scheduler_config": { - "enabled": True - }}, - enforce_eager=False) as vllm_model: + with VllmRunner(model_name, + tensor_parallel_size=2, + enable_expert_parallel=True, + enforce_eager=False) as vllm_model: ep_output = vllm_model.generate_greedy(example_prompts, max_tokens) check_outputs_equal( diff --git a/tests/e2e/multicard/test_fused_moe_allgather_ep.py b/tests/e2e/multicard/test_fused_moe_allgather_ep.py index 9335e19a..85d246e5 100644 --- a/tests/e2e/multicard/test_fused_moe_allgather_ep.py +++ b/tests/e2e/multicard/test_fused_moe_allgather_ep.py @@ -49,13 +49,7 @@ def test_generate_with_allgather(): tensor_parallel_size=2, max_model_len=1024, dtype="auto", - enable_expert_parallel=True, - additional_config={ - "ascend_scheduler_config": { - "enabled": True, - "chunked_prefill_enabled": False, - }, - }) as vllm_model: + enable_expert_parallel=True) as vllm_model: vllm_model.generate(example_prompts, sampling_params) @@ -76,11 +70,5 @@ def test_generate_with_alltoall(): tensor_parallel_size=2, max_model_len=1024, dtype="auto", - enable_expert_parallel=True, - additional_config={ - "ascend_scheduler_config": { - "enabled": True, - "chunked_prefill_enabled": False, - }, - }) as vllm_model: + enable_expert_parallel=True) as vllm_model: vllm_model.generate(example_prompts, sampling_params) diff --git a/tests/e2e/multicard/test_offline_inference_distributed.py b/tests/e2e/multicard/test_offline_inference_distributed.py index 320c3bdf..1380c49e 100644 --- a/tests/e2e/multicard/test_offline_inference_distributed.py +++ b/tests/e2e/multicard/test_offline_inference_distributed.py @@ -82,9 +82,6 @@ def test_models_distributed_DeepSeek_multistream_moe(): "enabled": True, }, "enable_multistream_moe": True, - "ascend_scheduler_config": { - "enabled": True, - }, "refresh": True, }, ) as vllm_model: @@ -154,14 +151,9 @@ def test_models_distributed_DeepSeek_W4A8DYNAMIC(model): quantization="ascend", enforce_eager=True, enable_expert_parallel=True, - additional_config={ - "torchair_graph_config": { - "enabled": False, - }, - "ascend_scheduler_config": { - "enabled": True, - } - }, + additional_config={"torchair_graph_config": { + "enabled": False, + }}, ) as vllm_model: vllm_model.generate_greedy(prompts, max_tokens) diff --git a/tests/e2e/multicard/test_prefix_caching.py b/tests/e2e/multicard/test_prefix_caching.py index e2991662..e076fd01 100644 --- a/tests/e2e/multicard/test_prefix_caching.py +++ b/tests/e2e/multicard/test_prefix_caching.py @@ -1,6 +1,6 @@ # SPDX-License-Identifier: Apache-2.0 # SPDX-FileCopyrightText: Copyright contributors to the vLLM project -"""Compare the with and without prefix caching on V1 scheduler or AscendScheduler.""" +"""Compare the with and without prefix caching on V1 scheduler.""" import pytest @@ -84,67 +84,3 @@ def test_prefix_cache_with_v1_scheduler(model: str, max_tokens: int) -> None: name_0="vllm_output", name_1="prefix_cache_output", ) - - -@pytest.mark.skip(reason="Fix me, the accuracy is not correct") -@pytest.mark.parametrize("model", MODELS) -@pytest.mark.parametrize("max_tokens", [50]) -def test_prefix_cache_with_ascend_scheduler(model: str, - max_tokens: int) -> None: - - with VllmRunner(model, - additional_config={ - 'ascend_scheduler_config': { - 'enabled': True, - }, - }, - enforce_eager=False, - max_model_len=2048, - tensor_parallel_size=2, - gpu_memory_utilization=0.7) as vllm_model: - vllm_output = vllm_model.generate_greedy(INPUT_PROMPTS, max_tokens) - - with VllmRunner(model, - additional_config={ - 'ascend_scheduler_config': { - 'enabled': True, - 'enable_prefix_caching': True, - }, - }, - enforce_eager=False, - max_model_len=2048, - tensor_parallel_size=2, - gpu_memory_utilization=0.7) as vllm_model: - prefix_cache_output = vllm_model.generate_greedy( - INPUT_PROMPTS, max_tokens) - - # TODO: enable apc and chunked prefill with ascend scheduler will lead accuracy problem. - # Disable it now. Fix it or drop the ascend scheduler in the future. - # with VllmRunner(model, - # additional_config={ - # 'ascend_scheduler_config': { - # 'enabled': True, - # 'enable_prefix_caching': True, - # "enable_chunked_prefill": True, - # }, - # }, - # enforce_eager=True, - # max_model_len=2048, - # tensor_parallel_size=2, - # gpu_memory_utilization=0.7) as vllm_model: - # chunk_prefill_prefix_cache_output = vllm_model.generate_greedy( - # INPUT_PROMPTS, max_tokens) - - check_outputs_equal( - outputs_0_lst=vllm_output, - outputs_1_lst=prefix_cache_output, - name_0="vllm_output", - name_1="prefix_cache_output", - ) - - # check_outputs_equal( - # outputs_0_lst=chunk_prefill_prefix_cache_output, - # outputs_1_lst=prefix_cache_output, - # name_0="chunk_prefill_prefix_cache_output", - # name_1="prefix_cache_output", - # ) diff --git a/tests/e2e/multicard/test_qwen3_next.py b/tests/e2e/multicard/test_qwen3_next.py index e51748ea..6df2da48 100644 --- a/tests/e2e/multicard/test_qwen3_next.py +++ b/tests/e2e/multicard/test_qwen3_next.py @@ -24,6 +24,7 @@ Run `pytest tests/e2e/multicard/test_qwen3_next.py`. import os from unittest.mock import patch +import pytest from modelscope import snapshot_download # type: ignore from tests.e2e.conftest import VllmRunner @@ -63,6 +64,8 @@ def test_models_distributed_Qwen3_NEXT_TP4_FULL_DECODE_ONLY(): del vllm_model +@pytest.mark.skip( + reason="Qwen3-Next + MTP doesn't work with chunked prefill. Fix Me") def test_models_distributed_Qwen3_NEXT_MTP_TP4_SIMILARITY(): example_prompts = [ "Hello, my name is", @@ -89,12 +92,6 @@ def test_models_distributed_Qwen3_NEXT_MTP_TP4_SIMILARITY(): gpu_memory_utilization=0.8, distributed_executor_backend="mp", enforce_eager=True, - additional_config={ - "ascend_scheduler_config": { - "enabled": True, - "enable_chunked_prefill": False - } - }, speculative_config={ "method": "qwen3_next_mtp", "num_speculative_tokens": 1 diff --git a/tests/e2e/multicard/test_torchair_graph_mode.py b/tests/e2e/multicard/test_torchair_graph_mode.py index a6f3f16d..ea53f848 100644 --- a/tests/e2e/multicard/test_torchair_graph_mode.py +++ b/tests/e2e/multicard/test_torchair_graph_mode.py @@ -44,9 +44,6 @@ def _deepseek_torchair_test_fixture( kwargs = {} if not use_v1_schduler: kwargs = { - "ascend_scheduler_config": { - "enabled": True, - }, "refresh": True, } additional_config.update(**kwargs) @@ -120,9 +117,6 @@ def _pangu_torchair_test_fixture( # torchair is only work without chunked-prefill now kwargs = { - "ascend_scheduler_config": { - "enabled": True, - }, "refresh": True, } additional_config.update(**kwargs) @@ -185,9 +179,6 @@ def _qwen_torchair_test_fixture( "torchair_graph_config": { "enabled": False, }, - "ascend_scheduler_config": { - "enabled": True, - }, "refresh": True, } @@ -244,9 +235,6 @@ def _deepseek_v2_lite_torchair_test_fixure( kwargs = {} if not use_v1_schduler: kwargs = { - "ascend_scheduler_config": { - "enable": True, - }, "refresh": True, } additional_config.update(**kwargs) diff --git a/tests/e2e/nightly/features/test_mtpx_deepseek_r1_0528_w8a8.py b/tests/e2e/nightly/features/test_mtpx_deepseek_r1_0528_w8a8.py index 65d01b21..880b44ae 100644 --- a/tests/e2e/nightly/features/test_mtpx_deepseek_r1_0528_w8a8.py +++ b/tests/e2e/nightly/features/test_mtpx_deepseek_r1_0528_w8a8.py @@ -73,11 +73,7 @@ async def test_models(model: str, mode: str) -> None: "VLLM_RPC_TIMEOUT": "3600000", "VLLM_EXECUTE_MODEL_TIMEOUT_SECONDS": "3600000" } - additional_config: dict[str, Any] = { - "ascend_scheduler_config": { - "enabled": False - }, - } + additional_config: dict[str, Any] = {} speculative_config = { "num_speculative_tokens": 2, "method": "deepseek_mtp" diff --git a/tests/e2e/nightly/features/test_prefix_cache_deepseek_r1_0528_w8a8.py b/tests/e2e/nightly/features/test_prefix_cache_deepseek_r1_0528_w8a8.py index 8ac1883d..80157588 100644 --- a/tests/e2e/nightly/features/test_prefix_cache_deepseek_r1_0528_w8a8.py +++ b/tests/e2e/nightly/features/test_prefix_cache_deepseek_r1_0528_w8a8.py @@ -74,9 +74,6 @@ async def test_models(model: str) -> None: "PYTORCH_NPU_ALLOC_CONF": "expandable_segments:True", } additional_config = { - "ascend_scheduler_config": { - "enabled": False - }, "torchair_graph_config": { "enabled": True, "enable_multistream_moe": False, diff --git a/tests/e2e/nightly/features/test_prefix_cache_qwen3_32b_int8.py b/tests/e2e/nightly/features/test_prefix_cache_qwen3_32b_int8.py index 3ee23287..fdf7167b 100644 --- a/tests/e2e/nightly/features/test_prefix_cache_qwen3_32b_int8.py +++ b/tests/e2e/nightly/features/test_prefix_cache_qwen3_32b_int8.py @@ -68,12 +68,7 @@ aisbench_cases75 = [{ async def test_models(model: str) -> None: port = get_open_port() env_dict = {"TASK_QUEUE_ENABLE": "1", "HCCL_OP_EXPANSION_MODE": "AIV"} - additional_config = { - "ascend_scheduler_config": { - "enabled": False - }, - "enable_weight_nz_layout": True - } + additional_config = {"enable_weight_nz_layout": True} server_args = [ "--quantization", "ascend", "--reasoning-parser", "qwen3", "--tensor-parallel-size", "4", "--port", diff --git a/tests/e2e/nightly/features/test_qwen3_32b_int8_a3_feature_stack3.py b/tests/e2e/nightly/features/test_qwen3_32b_int8_a3_feature_stack3.py index 17a7f4b6..9fa2d1e5 100644 --- a/tests/e2e/nightly/features/test_qwen3_32b_int8_a3_feature_stack3.py +++ b/tests/e2e/nightly/features/test_qwen3_32b_int8_a3_feature_stack3.py @@ -83,8 +83,7 @@ async def test_models(model: str, tp_size: int) -> None: "0.9", "--block-size", "128", "--max-num-seqs", "256", "--enforce-eager", "--max-model-len", "35840", "--max-num-batched-tokens", "35840", "--additional-config", - '{"ascend_scheduler_config":{"enabled":true},"enable_weight_nz_layout":true}', - "--compilation-config", + '{"enable_weight_nz_layout":true}', "--compilation-config", '{"cudagraph_mode":"FULL_DECODE_ONLY", "cudagraph_capture_sizes":[1,8,24,48,60]}' ] with RemoteOpenAIServer(model, diff --git a/tests/e2e/nightly/models/test_deepseek_r1_0528_w8a8.py b/tests/e2e/nightly/models/test_deepseek_r1_0528_w8a8.py index c9126577..3dd80d4a 100644 --- a/tests/e2e/nightly/models/test_deepseek_r1_0528_w8a8.py +++ b/tests/e2e/nightly/models/test_deepseek_r1_0528_w8a8.py @@ -33,7 +33,6 @@ MODES = [ "single", "aclgraph", "aclgraph_mlapo", - "no_chunkprefill", ] prompts = [ @@ -82,9 +81,6 @@ async def test_models(model: str, mode: str) -> None: "method": "deepseek_mtp" } additional_config = { - "ascend_scheduler_config": { - "enabled": False - }, "torchair_graph_config": { "enabled": True, "enable_multistream_moe": False, @@ -112,10 +108,6 @@ async def test_models(model: str, mode: str) -> None: if mode == "aclgraph_mlapo": env_dict["VLLM_ASCEND_ENABLE_MLAPO"] = "1" additional_config["torchair_graph_config"] = {"enabled": False} - if mode == "no_chunkprefill": - additional_config["ascend_scheduler_config"] = {"enabled": True} - i = server_args.index("--max-num-batched-tokens") + 1 - server_args[i] = "36864" server_args.extend(["--additional-config", json.dumps(additional_config)]) request_keyword_args: dict[str, Any] = { **api_keyword_args, diff --git a/tests/e2e/nightly/models/test_deepseek_r1_w8a8_eplb.py b/tests/e2e/nightly/models/test_deepseek_r1_w8a8_eplb.py index bca2baf0..6413aba0 100644 --- a/tests/e2e/nightly/models/test_deepseek_r1_w8a8_eplb.py +++ b/tests/e2e/nightly/models/test_deepseek_r1_w8a8_eplb.py @@ -71,9 +71,6 @@ async def test_models(model: str) -> None: "cudagraph_mode": "FULL_DECODE_ONLY" } additional_config: dict[str, Any] = { - "ascend_scheduler_config": { - "enabled": False - }, "torchair_graph_config": { "enabled": True }, diff --git a/tests/e2e/nightly/models/test_deepseek_v3_2_exp_w8a8.py b/tests/e2e/nightly/models/test_deepseek_v3_2_exp_w8a8.py index 217b2786..73cd8405 100644 --- a/tests/e2e/nightly/models/test_deepseek_v3_2_exp_w8a8.py +++ b/tests/e2e/nightly/models/test_deepseek_v3_2_exp_w8a8.py @@ -92,7 +92,6 @@ async def test_models(model: str, tp_size: int, dp_size: int, "--gpu-memory-utilization", "0.9", "--additional-config", - '{"ascend_scheduler_config":{"enabled":true},' '"torchair_graph_config":{"enabled":true,"graph_batch_sizes":[16]}}', ] if full_graph: diff --git a/tests/e2e/nightly/models/test_qwen2_5_vl_32b.py b/tests/e2e/nightly/models/test_qwen2_5_vl_32b.py index fe6bbedf..77c1a7e1 100644 --- a/tests/e2e/nightly/models/test_qwen2_5_vl_32b.py +++ b/tests/e2e/nightly/models/test_qwen2_5_vl_32b.py @@ -85,9 +85,8 @@ async def test_models(model: str, tp_size: int) -> None: str(tp_size), "--port", str(port), "--max-model-len", "30000", "--max-num-batched-tokens", "40000", "--max-num-seqs", "400", "--trust-remote-code", - "--gpu-memory-utilization", "0.8", "--additional-config", - '{"ascend_scheduler_config":{"enabled":false}}', - "--compilation_config", '{"cudagraph_mode": "FULL_DECODE_ONLY"}' + "--gpu-memory-utilization", "0.8", "--compilation_config", + '{"cudagraph_mode": "FULL_DECODE_ONLY"}' ] request_keyword_args: dict[str, Any] = { **api_keyword_args, diff --git a/tests/e2e/nightly/models/test_qwen3_235b_a22b_w8a8_eplb.py b/tests/e2e/nightly/models/test_qwen3_235b_a22b_w8a8_eplb.py index 945d7cae..efbf77d2 100644 --- a/tests/e2e/nightly/models/test_qwen3_235b_a22b_w8a8_eplb.py +++ b/tests/e2e/nightly/models/test_qwen3_235b_a22b_w8a8_eplb.py @@ -60,11 +60,7 @@ async def test_models(model: str) -> None: "PYTORCH_NPU_ALLOC_CONF": "expandable_segments:True", "VLLM_ASCEND_ENABLE_FLASHCOMM1": "1" } - additional_config: dict[str, Any] = { - "ascend_scheduler_config": { - "enabled": False - }, - } + additional_config: dict[str, Any] = {} compilation_config = {"cudagraph_mode": "FULL_DECODE_ONLY"} server_args = [ "--quantization", "ascend", "--async-scheduling", diff --git a/tests/e2e/nightly/models/test_qwen3_235b_w8a8.py b/tests/e2e/nightly/models/test_qwen3_235b_w8a8.py index 8220e4d5..055a452e 100644 --- a/tests/e2e/nightly/models/test_qwen3_235b_w8a8.py +++ b/tests/e2e/nightly/models/test_qwen3_235b_w8a8.py @@ -63,11 +63,6 @@ async def test_models(model: str, mode: str) -> None: "PYTORCH_NPU_ALLOC_CONF": "expandable_segments:True", "VLLM_ASCEND_ENABLE_FLASHCOMM1": "1" } - additional_config: dict[str, Any] = { - "ascend_scheduler_config": { - "enabled": False - }, - } compilation_config = {"cudagraph_mode": "FULL_DECODE_ONLY"} server_args = [ "--quantization", "ascend", "--async-scheduling", @@ -82,7 +77,6 @@ async def test_models(model: str, mode: str) -> None: server_args.extend( ["--compilation-config", json.dumps(compilation_config)]) - server_args.extend(["--additional-config", json.dumps(additional_config)]) request_keyword_args: dict[str, Any] = { **api_keyword_args, } diff --git a/tests/e2e/nightly/models/test_qwq_32b.py b/tests/e2e/nightly/models/test_qwq_32b.py index a60eff22..824651ba 100644 --- a/tests/e2e/nightly/models/test_qwq_32b.py +++ b/tests/e2e/nightly/models/test_qwq_32b.py @@ -93,8 +93,6 @@ async def test_models(model: str, mode: str, tp_size: int) -> None: server_args.remove( '{"cudagraph_mode":"FULL_DECODE_ONLY", "cudagraph_capture_sizes": [1, 8, 24, 48, 60]}' ) - server_args.append("--additional-config") - server_args.append('{"ascend_scheduler_config":{"enabled":true}}') server_args.append("--enforce-eager") request_keyword_args: dict[str, Any] = { **api_keyword_args, diff --git a/tests/e2e/nightly/multi_node/config/models/DeepSeek-R1-W8A8-A2-torchair.yaml b/tests/e2e/nightly/multi_node/config/models/DeepSeek-R1-W8A8-A2-torchair.yaml index 42b70f76..7bfe3f5e 100644 --- a/tests/e2e/nightly/multi_node/config/models/DeepSeek-R1-W8A8-A2-torchair.yaml +++ b/tests/e2e/nightly/multi_node/config/models/DeepSeek-R1-W8A8-A2-torchair.yaml @@ -30,7 +30,7 @@ deployment: --quantization ascend --gpu-memory-utilization 0.9 --speculative-config '{"num_speculative_tokens": 1, "method":"deepseek_mtp"}' - --additional-config '{"ascend_scheduler_config":{"enabled":false},"torchair_graph_config":{"enabled":true,"enable_multistream_moe":true},"chunked_prefill_for_mla":true,"enable_weight_nz_layout":true}' + --additional-config '{"torchair_graph_config":{"enabled":true,"enable_multistream_moe":true},"chunked_prefill_for_mla":true,"enable_weight_nz_layout":true}' - server_cmd: > @@ -51,7 +51,7 @@ deployment: --quantization ascend --gpu-memory-utilization 0.9 --speculative-config '{"num_speculative_tokens": 1, "method":"deepseek_mtp"}' - --additional-config '{"ascend_scheduler_config":{"enabled":false},"torchair_graph_config":{"enabled":true,"enable_multistream_moe":true},"chunked_prefill_for_mla":true,"enable_weight_nz_layout":true}' + --additional-config '{"torchair_graph_config":{"enabled":true,"enable_multistream_moe":true},"chunked_prefill_for_mla":true,"enable_weight_nz_layout":true}' benchmarks: acc: case_type: accuracy diff --git a/tests/e2e/nightly/multi_node/config/models/DeepSeek-R1-W8A8-A2.yaml b/tests/e2e/nightly/multi_node/config/models/DeepSeek-R1-W8A8-A2.yaml index cf44bc8f..01100f29 100644 --- a/tests/e2e/nightly/multi_node/config/models/DeepSeek-R1-W8A8-A2.yaml +++ b/tests/e2e/nightly/multi_node/config/models/DeepSeek-R1-W8A8-A2.yaml @@ -31,7 +31,7 @@ deployment: --gpu-memory-utilization 0.9 --enforce-eager --speculative-config '{"num_speculative_tokens": 1, "method":"deepseek_mtp"}' - --additional-config '{"ascend_scheduler_config":{"enabled":false},"chunked_prefill_for_mla":true,"enable_weight_nz_layout":true}' + --additional-config '{"chunked_prefill_for_mla":true,"enable_weight_nz_layout":true}' - server_cmd: > @@ -53,5 +53,5 @@ deployment: --gpu-memory-utilization 0.9 --enforce-eager --speculative-config '{"num_speculative_tokens": 1, "method":"deepseek_mtp"}' - --additional-config '{"ascend_scheduler_config":{"enabled":false},"chunked_prefill_for_mla":true,"enable_weight_nz_layout":true}' + --additional-config '{"chunked_prefill_for_mla":true,"enable_weight_nz_layout":true}' benchmarks: diff --git a/tests/e2e/nightly/multi_node/config/models/DeepSeek-R1-W8A8-EPLB.yaml b/tests/e2e/nightly/multi_node/config/models/DeepSeek-R1-W8A8-EPLB.yaml index 9a4c3d94..6ca189c4 100644 --- a/tests/e2e/nightly/multi_node/config/models/DeepSeek-R1-W8A8-EPLB.yaml +++ b/tests/e2e/nightly/multi_node/config/models/DeepSeek-R1-W8A8-EPLB.yaml @@ -50,7 +50,7 @@ deployment: "kv_connector_module_path": "vllm_ascend.distributed.llmdatadist_c_mgr_connector" }' --additional-config - '{"ascend_scheduler_config":{"enabled":false},"torchair_graph_config":{"enabled":false,"enable_multistream_shared_expert":false},"enable_prefill_optimizations":true,"enable_weight_nz_layout":true,"dynamic_eplb":true,"num_iterations_eplb_update":2048,"num_wait_worker_iterations":200}' + '{"torchair_graph_config":{"enabled":false,"enable_multistream_shared_expert":false},"enable_prefill_optimizations":true,"enable_weight_nz_layout":true,"dynamic_eplb":true,"num_iterations_eplb_update":2048,"num_wait_worker_iterations":200}' - server_cmd: > @@ -80,7 +80,7 @@ deployment: "kv_connector_module_path": "vllm_ascend.distributed.llmdatadist_c_mgr_connector" }' --additional-config - '{"ascend_scheduler_config":{"enabled":false},"torchair_graph_config":{"enabled":false,"enable_multistream_shared_expert":false},"enable_prefill_optimizations":true,"enable_weight_nz_layout":true,"dynamic_eplb":true,"num_iterations_eplb_update":2048,"num_wait_worker_iterations":200}' + '{"torchair_graph_config":{"enabled":false,"enable_multistream_shared_expert":false},"enable_prefill_optimizations":true,"enable_weight_nz_layout":true,"dynamic_eplb":true,"num_iterations_eplb_update":2048,"num_wait_worker_iterations":200}' - server_cmd: > vllm serve vllm-ascend/DeepSeek-R1-0528-W8A8 @@ -111,7 +111,7 @@ deployment: "kv_connector_module_path": "vllm_ascend.distributed.llmdatadist_c_mgr_connector" }' --additional-config - '{"ascend_scheduler_config":{"enabled":false},"torchair_graph_config":{"enabled":true,"enable_multistream_mla":true,"graph_batch_sizes":[28],"use_cached_graph":true,"enable_super_kernel":false},"multistream_overlap_shared_expert":true,"dynamic_eplb":true,"num_iterations_eplb_update":2048,"num_wait_worker_iterations":200}' + '{"torchair_graph_config":{"enabled":true,"enable_multistream_mla":true,"graph_batch_sizes":[28],"use_cached_graph":true,"enable_super_kernel":false},"multistream_overlap_shared_expert":true,"dynamic_eplb":true,"num_iterations_eplb_update":2048,"num_wait_worker_iterations":200}' - server_cmd: > vllm serve vllm-ascend/DeepSeek-R1-0528-W8A8 @@ -141,7 +141,7 @@ deployment: "kv_connector_module_path": "vllm_ascend.distributed.llmdatadist_c_mgr_connector" }' --additional-config - '{"ascend_scheduler_config":{"enabled":false},"torchair_graph_config":{"enabled":true,"enable_multistream_mla":true,"graph_batch_sizes":[28],"use_cached_graph":true,"enable_super_kernel":false},"multistream_overlap_shared_expert":true,"dynamic_eplb":true,"num_iterations_eplb_update":2048,"num_wait_worker_iterations":200}' + '{"torchair_graph_config":{"enabled":true,"enable_multistream_mla":true,"graph_batch_sizes":[28],"use_cached_graph":true,"enable_super_kernel":false},"multistream_overlap_shared_expert":true,"dynamic_eplb":true,"num_iterations_eplb_update":2048,"num_wait_worker_iterations":200}' benchmarks: perf: case_type: performance diff --git a/tests/e2e/nightly/multi_node/config/models/DeepSeek-R1-W8A8.yaml b/tests/e2e/nightly/multi_node/config/models/DeepSeek-R1-W8A8.yaml index a8e49290..37a024b9 100644 --- a/tests/e2e/nightly/multi_node/config/models/DeepSeek-R1-W8A8.yaml +++ b/tests/e2e/nightly/multi_node/config/models/DeepSeek-R1-W8A8.yaml @@ -49,7 +49,7 @@ deployment: "kv_connector_module_path": "vllm_ascend.distributed.llmdatadist_c_mgr_connector" }' --additional-config - '{"ascend_scheduler_config":{"enabled":false},"torchair_graph_config":{"enabled":false,"enable_multistream_shared_expert":false},"enable_prefill_optimizations":true,"enable_weight_nz_layout":true}' + '{"torchair_graph_config":{"enabled":false,"enable_multistream_shared_expert":false},"enable_prefill_optimizations":true,"enable_weight_nz_layout":true}' - server_cmd: > @@ -79,7 +79,7 @@ deployment: "kv_connector_module_path": "vllm_ascend.distributed.llmdatadist_c_mgr_connector" }' --additional-config - '{"ascend_scheduler_config":{"enabled":false},"torchair_graph_config":{"enabled":false,"enable_multistream_shared_expert":false},"enable_prefill_optimizations":true,"enable_weight_nz_layout":true}' + '{"torchair_graph_config":{"enabled":false,"enable_multistream_shared_expert":false},"enable_prefill_optimizations":true,"enable_weight_nz_layout":true}' - server_cmd: > vllm serve vllm-ascend/DeepSeek-R1-0528-W8A8 @@ -110,7 +110,7 @@ deployment: "kv_connector_module_path": "vllm_ascend.distributed.llmdatadist_c_mgr_connector" }' --additional-config - '{"ascend_scheduler_config":{"enabled":false},"torchair_graph_config":{"enabled":true,"enable_multistream_mla":true,"graph_batch_sizes":[28],"use_cached_graph":true,"enable_super_kernel":false},"multistream_overlap_shared_expert":true}' + '{"torchair_graph_config":{"enabled":true,"enable_multistream_mla":true,"graph_batch_sizes":[28],"use_cached_graph":true,"enable_super_kernel":false},"multistream_overlap_shared_expert":true}' - server_cmd: > vllm serve vllm-ascend/DeepSeek-R1-0528-W8A8 @@ -140,7 +140,7 @@ deployment: "kv_connector_module_path": "vllm_ascend.distributed.llmdatadist_c_mgr_connector" }' --additional-config - '{"ascend_scheduler_config":{"enabled":false},"torchair_graph_config":{"enabled":true,"enable_multistream_mla":true,"graph_batch_sizes":[28],"use_cached_graph":true,"enable_super_kernel":false},"multistream_overlap_shared_expert":true}' + '{"torchair_graph_config":{"enabled":true,"enable_multistream_mla":true,"graph_batch_sizes":[28],"use_cached_graph":true,"enable_super_kernel":false},"multistream_overlap_shared_expert":true}' benchmarks: perf: case_type: performance diff --git a/tests/e2e/nightly/multi_node/config/models/DeepSeek-V3_2-Exp-bf16.yaml b/tests/e2e/nightly/multi_node/config/models/DeepSeek-V3_2-Exp-bf16.yaml index 6dafd3cc..40ac6476 100644 --- a/tests/e2e/nightly/multi_node/config/models/DeepSeek-V3_2-Exp-bf16.yaml +++ b/tests/e2e/nightly/multi_node/config/models/DeepSeek-V3_2-Exp-bf16.yaml @@ -29,7 +29,7 @@ deployment: --trust-remote-code --no-enable-prefix-caching --gpu-memory-utilization 0.9 - --additional-config '{"ascend_scheduler_config":{"enabled":true},"torchair_graph_config":{"enabled":true,"graph_batch_sizes":[16]}}' + --additional-config '{"torchair_graph_config":{"enabled":true,"graph_batch_sizes":[16]}}' - server_cmd: > @@ -49,5 +49,5 @@ deployment: --trust-remote-code --no-enable-prefix-caching --gpu-memory-utilization 0.92 - --additional-config '{"ascend_scheduler_config":{"enabled":true},"torchair_graph_config":{"enabled":true,"graph_batch_sizes":[16]}}' + --additional-config '{"torchair_graph_config":{"enabled":true,"graph_batch_sizes":[16]}}' benchmarks: diff --git a/tests/e2e/singlecard/spec_decode_v1/test_v1_mtp_correctness.py b/tests/e2e/singlecard/spec_decode_v1/test_v1_mtp_correctness.py index 2f56d9d2..6b90ec36 100644 --- a/tests/e2e/singlecard/spec_decode_v1/test_v1_mtp_correctness.py +++ b/tests/e2e/singlecard/spec_decode_v1/test_v1_mtp_correctness.py @@ -48,27 +48,26 @@ def mtp_correctness(sampling_config: SamplingParams, if graph_mode == CUDAGraphMode.FULL: graph_mode_str = "FULL_DECODE_ONLY" - with VllmRunner( - model_name, - tensor_parallel_size=1, - max_num_seqs=256, - gpu_memory_utilization=0.7, - distributed_executor_backend="mp", - enable_expert_parallel=True, - speculative_config={ - "method": "deepseek_mtp", - "num_speculative_tokens": num_speculative_tokens, - "disable_padded_drafter_batch": disable_padded_drafter_batch, - }, - enforce_eager=enforce_eager, - max_model_len=2000, - compilation_config=CompilationConfig( - cudagraph_mode=graph_mode_str, - cudagraph_capture_sizes=[12], - ), - additional_config={"ascend_scheduler_config": { - "enabled": False - }}) as spec_llm: + with VllmRunner(model_name, + tensor_parallel_size=1, + max_num_seqs=256, + gpu_memory_utilization=0.7, + distributed_executor_backend="mp", + enable_expert_parallel=True, + speculative_config={ + "method": + "deepseek_mtp", + "num_speculative_tokens": + num_speculative_tokens, + "disable_padded_drafter_batch": + disable_padded_drafter_batch, + }, + enforce_eager=enforce_eager, + max_model_len=2000, + compilation_config=CompilationConfig( + cudagraph_mode=graph_mode_str, + cudagraph_capture_sizes=[12], + )) as spec_llm: spec_outputs = spec_llm.generate(example_prompts, sampling_config) matches = 0 diff --git a/tests/e2e/singlecard/test_ascend_scheduler.py b/tests/e2e/singlecard/test_ascend_scheduler.py deleted file mode 100644 index 502a8103..00000000 --- a/tests/e2e/singlecard/test_ascend_scheduler.py +++ /dev/null @@ -1,170 +0,0 @@ -# SPDX-License-Identifier: Apache-2.0 -# SPDX-FileCopyrightText: Copyright contributors to the vLLM project -import pytest -from vllm import SamplingParams - -from tests.e2e.conftest import VllmRunner -from tests.e2e.model_utils import check_outputs_equal - -MODEL = "Qwen/Qwen3-0.6B" - - -@pytest.mark.parametrize("enforce_eager", [True, False]) -def test_concurrent_partial_prefill(enforce_eager): - with VllmRunner(MODEL, - additional_config={ - 'ascend_scheduler_config': { - 'enabled': True, - }, - }, - max_num_seqs=3, - max_num_batched_tokens=8192, - enforce_eager=enforce_eager, - gpu_memory_utilization=0.7) as vllm_model: - outputs = vllm_model.model.generate(["Hello my name is Robert and I"] * - 3) - assert len(outputs) == 3 - for output in outputs: - assert len(output.outputs) == 1 - - -@pytest.mark.parametrize("enforce_eager", [True, False]) -def test_prefix_cache_stats_is_recorded(enforce_eager): - with VllmRunner(MODEL, - additional_config={ - 'ascend_scheduler_config': { - 'enabled': True, - }, - }, - max_num_seqs=3, - max_num_batched_tokens=8192, - enforce_eager=enforce_eager, - gpu_memory_utilization=0.7) as vllm_model: - # 17 tokens will make sure first 16 tokens are cached in a block - input_tokens = {"prompt_token_ids": [101] * 129} - _ = vllm_model.model.generate([input_tokens]) - outputs = vllm_model.model.generate([input_tokens]) - assert outputs[0].num_cached_tokens == 128 - - -@pytest.mark.parametrize("max_tokens", - [4]) # cannot align results when max_tokens > 4 -@pytest.mark.parametrize("chunked_prefill_token_size", [2048]) -def test_chunked_prefill_with_ascend_scheduler( - max_tokens: int, chunked_prefill_token_size: int) -> None: - example_prompts = [ - "vLLM is a high-throughput and memory-efficient inference and serving engine for LLMs." - ] - max_num_seqs = chunked_prefill_token_size - max_num_batched_tokens = chunked_prefill_token_size - with VllmRunner(MODEL, - additional_config={ - 'ascend_scheduler_config': { - 'enabled': True, - 'enable_chunked_prefill': True, - }, - }, - max_num_seqs=max_num_seqs, - max_num_batched_tokens=max_num_batched_tokens, - max_model_len=2048, - gpu_memory_utilization=0.7) as vllm_model: - chunked_prefill_output = vllm_model.generate_greedy( - example_prompts, max_tokens) - - with VllmRunner(MODEL, - additional_config={ - 'ascend_scheduler_config': { - 'enabled': True, - }, - }, - max_model_len=2048, - gpu_memory_utilization=0.7) as vllm_model: - vllm_output = vllm_model.generate_greedy(example_prompts, max_tokens) - - check_outputs_equal( - outputs_0_lst=vllm_output, - outputs_1_lst=chunked_prefill_output, - name_0="vllm_output", - name_1="chunked_prefill_output", - ) - - -@pytest.mark.parametrize("max_tokens", - [4]) # cannot align results when max_tokens > 4 -@pytest.mark.parametrize("chunked_prefill_token_size", [2048]) -def test_chunked_prefill_with_scheduler_dynamic_batch( - max_tokens: int, chunked_prefill_token_size: int) -> None: - example_prompts = [ - "vLLM is a high-throughput and memory-efficient inference and serving engine for LLMs." - ] - max_num_seqs = chunked_prefill_token_size - max_num_batched_tokens = chunked_prefill_token_size - with VllmRunner(MODEL, - additional_config={ - 'SLO_limits_for_dynamic_batch': 0, - }, - max_num_seqs=max_num_seqs, - max_num_batched_tokens=max_num_batched_tokens, - max_model_len=2048, - gpu_memory_utilization=0.7) as vllm_model: - dynamic_batch_output = vllm_model.generate_greedy( - example_prompts, max_tokens) - - with VllmRunner(MODEL, - additional_config={ - 'SLO_limits_for_dynamic_batch': -1, - }, - max_model_len=2048, - gpu_memory_utilization=0.7) as vllm_model: - vllm_output = vllm_model.generate_greedy(example_prompts, max_tokens) - - check_outputs_equal( - outputs_0_lst=vllm_output, - outputs_1_lst=dynamic_batch_output, - name_0="vllm_output", - name_1="chunked_prefill_output", - ) - - -def test_async_scheduling_eager() -> None: - prompts = [ - "Hello, my name is", - "The president of the United States is", - "The capital of France is", - "The future of AI is", - ] * 10 - sampling_params = SamplingParams(temperature=0.2, - max_tokens=10, - stop_token_ids=None) - - with VllmRunner( - "Qwen/Qwen2.5-0.5B-Instruct", - max_model_len=4096, - max_num_seqs=50, - dtype="bfloat16", - gpu_memory_utilization=0.9, - async_scheduling=True, - ) as vllm_model: - vllm_model.generate(prompts, sampling_params=sampling_params) - - -def test_async_scheduling_with_full_graph() -> None: - prompts = [ - "Hello, my name is", - "The president of the United States is", - "The capital of France is", - "The future of AI is", - ] * 10 - sampling_params = SamplingParams(temperature=0.2, - max_tokens=10, - stop_token_ids=None) - - with VllmRunner("Qwen/Qwen3-8B", - max_model_len=4096, - max_num_seqs=50, - dtype="bfloat16", - gpu_memory_utilization=0.9, - async_scheduling=True, - compilation_config={"cudagraph_mode": - "FULL"}) as vllm_model: - vllm_model.generate(prompts, sampling_params=sampling_params) diff --git a/tests/e2e/singlecard/test_chunked.py b/tests/e2e/singlecard/test_chunked.py deleted file mode 100644 index f6eacb71..00000000 --- a/tests/e2e/singlecard/test_chunked.py +++ /dev/null @@ -1,82 +0,0 @@ -# -# Copyright (c) 2025 Huawei Technologies Co., Ltd. All Rights Reserved. -# Copyright 2023 The vLLM team. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -""" -Compare the outputs of vLLM with and without aclgraph. - -Run `pytest tests/compile/test_aclgraph.py`. -""" -import gc - -import pytest -import torch -from vllm import SamplingParams - -from tests.e2e.conftest import VllmRunner - -MODELS = ["Qwen/Qwen2.5-0.5B-Instruct"] - - -@pytest.mark.parametrize("model", MODELS) -@pytest.mark.parametrize("max_tokens", [1]) -def test_models( - model: str, - max_tokens: int, -) -> None: - prompts = ["The president of the United States is"] - - sampling_params = SamplingParams( - max_tokens=max_tokens, - temperature=0.0, - ) - - with VllmRunner(model, - long_prefill_token_threshold=20, - enforce_eager=False) as vllm_model: - output1 = vllm_model.generate(prompts, sampling_params) - - with VllmRunner(model, - enforce_eager=False, - additional_config={ - 'ascend_scheduler_config': { - 'enabled': True - }, - }) as vllm_model: - output2 = vllm_model.generate(prompts, sampling_params) - - # Extract the generated token IDs for comparison - token_ids1 = output1[0][0][0] - token_ids2 = output2[0][0][0] - - print(f"Token IDs 1: {token_ids1}") - print(f"Token IDs 2: {token_ids2}") - - # Convert token IDs to tensors and calculate cosine similarity - # Take the length of a shorter sequence to ensure consistent dimensions - min_len = min(len(token_ids1), len(token_ids2)) - - tensor1 = torch.tensor(token_ids1[:min_len], dtype=torch.float32) - tensor2 = torch.tensor(token_ids2[:min_len], dtype=torch.float32) - - # Calculate similarity using torch.cosine_similarity - similarity = torch.cosine_similarity(tensor1, tensor2, dim=0) - print(f"Token IDs cosine similarity: {similarity.item()}") - - assert similarity > 0.95 - - gc.collect() - torch.npu.empty_cache() - torch.npu.reset_peak_memory_stats() diff --git a/tests/e2e/singlecard/test_vlm.py b/tests/e2e/singlecard/test_vlm.py index cc3d50f8..95456679 100644 --- a/tests/e2e/singlecard/test_vlm.py +++ b/tests/e2e/singlecard/test_vlm.py @@ -20,7 +20,6 @@ Run `pytest tests/test_offline_inference.py`. """ -import pytest from vllm import SamplingParams from vllm.assets.audio import AudioAsset from vllm.assets.image import ImageAsset @@ -55,40 +54,6 @@ def test_multimodal_vl(prompt_template): assert output_str, "Generated output should not be empty." -@pytest.mark.skip(reason="This e2e test will stuck in multi-batch scenario. " - "Add this back after fixing the issue.") -def test_multimodal_ascend_scheduler(prompt_template): - image = ImageAsset("cherry_blossom") \ - .pil_image.convert("RGB") - img_questions = [ - "What is the content of this image?", - "Describe the content of this image in detail.", - "What's in the image?", - "Where is this image taken?", - ] - images = [image] * len(img_questions) - prompts = prompt_template(img_questions) - with VllmRunner("Qwen/Qwen2.5-VL-3B-Instruct", - max_model_len=4096, - additional_config={ - 'ascend_scheduler_config': { - 'enabled': True, - }, - }, - mm_processor_kwargs={ - "min_pixels": 28 * 28, - "max_pixels": 1280 * 28 * 28, - "fps": 1, - }, - enforce_eager=True) as vllm_model: - outputs = vllm_model.generate_greedy(prompts=prompts, - images=images, - max_tokens=64) - assert len(outputs) == len(prompts) - for _, output_str in outputs: - assert output_str, "Generated output should not be empty." - - def test_multimodal_audio(): audio_prompt = "".join([ f"Audio {idx+1}: <|audio_bos|><|AUDIO|><|audio_eos|>\n" diff --git a/tests/ut/core/test_schedule_config.py b/tests/ut/core/test_schedule_config.py deleted file mode 100644 index 032a1a87..00000000 --- a/tests/ut/core/test_schedule_config.py +++ /dev/null @@ -1,134 +0,0 @@ -# -# Copyright (c) 2025 Huawei Technologies Co., Ltd. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from vllm.config import SchedulerConfig - -from tests.ut.base import TestBase -from vllm_ascend.core.schedule_config import AscendSchedulerConfig - - -class TestAscendSchedulerConfig(TestBase): - - def setUp(self): - self.basic_scheduler_config = SchedulerConfig( - max_num_batched_tokens=8192, - max_model_len=8192, - is_multimodal_model=False, - send_delta_data=False, - ) - - def test_initialize_from_config_with_default(self): - # No additional config given, check the default value here. - ascend_config = AscendSchedulerConfig.initialize_from_config( - self.basic_scheduler_config, {}) - self.assertEqual(ascend_config.enable_chunked_prefill, False) - self.assertEqual(ascend_config.policy, "fcfs") - self.assertEqual(ascend_config.scheduler_cls, - "vllm_ascend.core.scheduler.AscendScheduler") - self.assertEqual(ascend_config.max_num_encoder_input_tokens, 8192) - self.assertEqual(ascend_config.encoder_cache_size, 8192) - - def test_initialize_from_config_with_override(self): - # test override - ascend_config = AscendSchedulerConfig.initialize_from_config( - self.basic_scheduler_config, - AscendSchedulerConfig( - enable_chunked_prefill=False, - policy="fcfs", - scheduler_cls="vllm_ascend.core.scheduler.AscendScheduler", - max_num_batched_tokens=8192, - max_model_len=2048, - max_long_partial_prefills=1, - long_prefill_token_threshold=512, - ), - ) - self.assertEqual(ascend_config.enable_chunked_prefill, False) - self.assertEqual(ascend_config.policy, "fcfs") - self.assertEqual(ascend_config.scheduler_cls, - "vllm_ascend.core.scheduler.AscendScheduler") - self.assertEqual(ascend_config.max_num_batched_tokens, 8192) - self.assertEqual(ascend_config.encoder_cache_size, 8192) - self.assertEqual(ascend_config.max_long_partial_prefills, 1) - self.assertEqual(ascend_config.long_prefill_token_threshold, 512) - - def test_not_implemented_policy(self): - with self.assertRaises(NotImplementedError) as context: - AscendSchedulerConfig.initialize_from_config( - self.basic_scheduler_config, - AscendSchedulerConfig( - policy="custom_policy", - max_num_batched_tokens=8192, - max_model_len=2048, - ), - ) - self.assertIn( - "currently AscendScheduler only supports fcfs policy", - str(context.exception), - ) - - def test_no_override(self): - ascend_config = AscendSchedulerConfig.initialize_from_config( - self.basic_scheduler_config, {}) - self.assertEqual(ascend_config.max_num_encoder_input_tokens, 8192) - self.assertEqual(ascend_config.encoder_cache_size, 8192) - - def test_valid_config_with_multimodal(self): - config = AscendSchedulerConfig.initialize_from_config( - SchedulerConfig(is_multimodal_model=True, - max_num_batched_tokens=8192), {}) - self.assertTrue(config.is_multimodal_model) - - def test_valid_config_with_chunked_prefill(self): - ascend_config = AscendSchedulerConfig.initialize_from_config( - self.basic_scheduler_config, - AscendSchedulerConfig( - enable_chunked_prefill=True, - max_num_batched_tokens=8192, - max_model_len=8192, - ), - ) - self.assertEqual(ascend_config.max_num_batched_tokens, 8192) - self.assertEqual(ascend_config.max_model_len, 8192) - self.assertTrue(ascend_config.enable_chunked_prefill) - - def test_invalid_config_without_chunked_prefill(self): - with self.assertRaises(ValueError) as context: - AscendSchedulerConfig.initialize_from_config( - self.basic_scheduler_config, - AscendSchedulerConfig( - enable_chunked_prefill=False, - max_num_batched_tokens=2048, - max_model_len=8192, - ), - ) - self.assertIn( - "Ascend scheduler is enabled without chunked prefill feature", - str(context.exception), - ) - self.assertIn("max_num_batched_tokens (2048)", str(context.exception)) - self.assertIn("max_model_len (8192)", str(context.exception)) - - def test_initialize_from_config_with_pd_transfer(self): - ascend_config = AscendSchedulerConfig.initialize_from_config( - self.basic_scheduler_config, - AscendSchedulerConfig( - enable_pd_transfer=True, - decode_max_num_seqs=48, - max_num_batched_tokens=8192, - max_model_len=4096, - ), - ) - self.assertEqual(ascend_config.enable_pd_transfer, True) - self.assertEqual(ascend_config.decode_max_num_seqs, 48) diff --git a/tests/ut/core/test_scheduler.py b/tests/ut/core/test_scheduler.py deleted file mode 100644 index 53af2f47..00000000 --- a/tests/ut/core/test_scheduler.py +++ /dev/null @@ -1,1473 +0,0 @@ -# SPDX-License-Identifier: Apache-2.0 -# SPDX-FileCopyrightText: Copyright contributors to the vLLM project -from typing import Any, Dict, List, Optional, Tuple -from unittest.mock import MagicMock, patch - -import numpy as np -import torch -from vllm.config import (CacheConfig, KVTransferConfig, ModelConfig, - SchedulerConfig, SpeculativeConfig, VllmConfig) -from vllm.multimodal.inputs import (MultiModalFeatureSpec, - MultiModalKwargsItem, PlaceholderRange) -from vllm.sampling_params import SamplingParams -from vllm.utils.hashing import sha256 -from vllm.v1.core.kv_cache_utils import (get_request_block_hasher, - init_none_hash) -from vllm.v1.core.sched.output import SchedulerOutput -from vllm.v1.kv_cache_interface import (FullAttentionSpec, KVCacheConfig, - KVCacheGroupSpec) -from vllm.v1.outputs import DraftTokenIds, ModelRunnerOutput -from vllm.v1.request import Request, RequestStatus -from vllm.v1.structured_output import StructuredOutputManager - -from tests.ut.base import TestBase -from vllm_ascend.core.scheduler import AscendScheduler -from vllm_ascend.core.scheduler_dynamic_batch import SchedulerDynamicBatch - -EOS_TOKEN_ID = 50256 -MODEL = "Qwen3-0.6B" -ENABLE_PREFIX_CACHING = None -PROMPT_LOGPROBS = None -ENABLE_CHUNKED_PREFILL = False -MAX_NUM_BATCHED_TOKENS = 10000 -LONG_PREFILL_TOKEN_THRESHOLD = 0 -NUM_SPECULATIVE_TOKENS = None -MAX_NUM_SEQS = 16 - - -def create_requests( - num_requests: int, - num_tokens: int = 10, - mm_positions: Optional[list[PlaceholderRange]] = None, - max_tokens: int = 16, - stop_token_ids: Optional[list[int]] = None, - block_size: int = 3, - hash_fn=sha256, -): - init_none_hash(hash_fn) - prompt_logprobs = PROMPT_LOGPROBS - sampling_params = SamplingParams(ignore_eos=False, - max_tokens=max_tokens, - stop_token_ids=stop_token_ids, - prompt_logprobs=prompt_logprobs) - requests = [] - for i in range(num_requests): - mm_features = [] - if mm_positions is not None: - mm_position = mm_positions[i] - for j, position in enumerate(mm_position): - identifier = f"hash{i}_{j}" - mm_feature = MultiModalFeatureSpec( - data=MultiModalKwargsItem.dummy("dummy_m"), - mm_position=position, - identifier=identifier, - modality="image") - mm_features.append(mm_feature) - request = Request(request_id=f"{i}", - prompt_token_ids=[i] * num_tokens, - sampling_params=sampling_params, - eos_token_id=EOS_TOKEN_ID, - pooling_params=None, - mm_features=mm_features if mm_features else None, - block_hasher=get_request_block_hasher( - block_size, hash_fn)) - requests.append(request) - return requests - - -def make_output(scheduler): - req_ids = [req.request_id for req in scheduler.running] - req_id_to_index = { - req.request_id: i - for i, req in enumerate(scheduler.running) - } - sampled_token_ids = [ - np.array([1000], dtype=np.int64) for _ in scheduler.running - ] - - logprobs = None - - modelrunner_output = ModelRunnerOutput( - req_ids=req_ids, - req_id_to_index=req_id_to_index, - sampled_token_ids=sampled_token_ids, - logprobs=logprobs, - prompt_logprobs_dict={}, - pooler_output=[], - ) - return modelrunner_output - - -class TestAscendScheduler(TestBase): - - @patch("vllm.config.ModelConfig.__post_init__", MagicMock()) - @patch("vllm.config.VllmConfig.__post_init__", MagicMock()) - @patch('vllm.v1.core.sched.scheduler.compute_encoder_budget') - def create_scheduler(self, mock_compute_encoder_budget): - mock_compute_encoder_budget.return_value = [100, 100] - use_kv_connector = False - block_size = 16 - - scheduler_config = SchedulerConfig( - max_num_seqs=16, - max_model_len=MAX_NUM_BATCHED_TOKENS, - long_prefill_token_threshold=LONG_PREFILL_TOKEN_THRESHOLD, - disable_chunked_mm_input=False, - enable_chunked_prefill=ENABLE_CHUNKED_PREFILL, - max_num_batched_tokens=MAX_NUM_BATCHED_TOKENS, - ) - - scheduler_config.max_num_encoder_input_tokens = 10000 - scheduler_config.encoder_cache_size = 10000 - scheduler_config.chunked_prefill_enabled = False - - model_config = ModelConfig( - model=MODEL, - task="auto", - tokenizer=MODEL, - tokenizer_mode="auto", - trust_remote_code=True, - dtype="float16", - seed=42, - max_model_len=MAX_NUM_BATCHED_TOKENS, - ) - model_config.pooler_config = MagicMock() - model_config.multimodal_config = MagicMock() - model_config.hf_config = MagicMock() - model_config.hf_config.is_encoder_decoder = False - # Cache config, optionally force APC - kwargs_cache: Dict[str, - Any] = ({} if ENABLE_PREFIX_CACHING is None else { - 'enable_prefix_caching': - ENABLE_PREFIX_CACHING - }) - cache_config = CacheConfig( - block_size=block_size, - gpu_memory_utilization=0.9, - swap_space=0, - cache_dtype="auto", - **kwargs_cache, - ) - - kv_transfer_config = KVTransferConfig( - kv_connector="SharedStorageConnector", - kv_role="kv_both", - kv_connector_extra_config={"shared_storage_path": "local_storage"}, - ) if use_kv_connector else None - - speculative_config: Optional[SpeculativeConfig] = None - if NUM_SPECULATIVE_TOKENS is not None: - speculative_config = SpeculativeConfig( - model="ngram", num_speculative_tokens=NUM_SPECULATIVE_TOKENS) - - vllm_config = VllmConfig( - scheduler_config=scheduler_config, - model_config=model_config, - cache_config=cache_config, - kv_transfer_config=kv_transfer_config, - speculative_config=speculative_config, - ) - - kv_cache_config = KVCacheConfig( - num_blocks=10000, # A large number of blocks to hold all requests - kv_cache_tensors=[], - kv_cache_groups=[ - KVCacheGroupSpec(['layer'], - FullAttentionSpec(block_size, 1, 1, - torch.float32, False, - False)) - ], - ) - cache_config.num_gpu_blocks = 10000 - - scheduler = AscendScheduler( - vllm_config=vllm_config, - kv_cache_config=kv_cache_config, - log_stats=True, - block_size=block_size, - structured_output_manager=MagicMock(spec=StructuredOutputManager), - ) - - should_advance = MagicMock() - should_advance.return_value = False - scheduler.structured_output_manager.should_advance = should_advance - - return scheduler - - def test_add_requests(self): - scheduler = self.create_scheduler() - requests = create_requests(num_requests=10) - - for i, request in enumerate(requests): - scheduler.add_request(request) - self.assertIn(request.request_id, scheduler.requests) - self.assertEqual(len(scheduler.waiting), i + 1) - - def test_finish_request(self): - scheduler = self.create_scheduler() - requests = create_requests(num_requests=10) - for request in requests: - scheduler.add_request(request) - - for i, request in enumerate(requests): - scheduler.finish_requests(request.request_id, - RequestStatus.FINISHED_ABORTED) - self.assertNotIn(request.request_id, scheduler.requests) - self.assertEqual(len(scheduler.waiting), 9 - i) - - def test_get_num_unfinished_requests(self): - scheduler = self.create_scheduler() - requests = create_requests(num_requests=10) - for request in requests: - scheduler.add_request(request) - - for i, request in enumerate(requests): - scheduler.finish_requests(request.request_id, - RequestStatus.FINISHED_STOPPED) - self.assertEqual(scheduler.get_num_unfinished_requests(), - len(requests) - i - 1) - - def test_schedule(self): - '''Test scheduling. - Two cases: default APC/no prompt logprobs; APC=True + prompt logprobs - ''' - scheduler = self.create_scheduler() - scheduler.scheduler_config.chunked_prefill_enabled = False - requests = create_requests(num_requests=10) - for request in requests: - scheduler.add_request(request) - - # Test initial scheduling - output = scheduler.schedule() - self.assertEqual(len(output.scheduled_new_reqs), len(requests)) - self.assertEqual(output.scheduled_cached_reqs.num_reqs, 0) - self.assertEqual(len(output.finished_req_ids), 0) - # Verify all requests are scheduled. - for req_id, num_tokens in output.num_scheduled_tokens.items(): - self.assertEqual(num_tokens, - len(requests[int(req_id)].prompt_token_ids)) - - # Verify requests moved from waiting to running - self.assertEqual(len(scheduler.waiting), 0) - self.assertEqual(len(scheduler.running), len(requests)) - for i, request in enumerate(requests): - self.assertEqual(scheduler.running[i], request) - - def test_schedule_multimodal_requests(self): - scheduler = self.create_scheduler() - scheduler.scheduler_config.chunked_prefill_enabled = False - mm_positions = [[PlaceholderRange(offset=i, length=10)] - for i in range(10)] - requests = create_requests( - num_requests=10, - mm_positions=mm_positions, - ) - for request in requests: - scheduler.add_request(request) - - output = scheduler.schedule() - self.assertEqual(len(output.scheduled_new_reqs), len(requests)) - self.assertEqual(output.scheduled_cached_reqs.num_reqs, 0) - self.assertEqual(len(output.finished_req_ids), 0) - for req_id, num_tokens in output.num_scheduled_tokens.items(): - assert num_tokens == len(requests[int(req_id)].prompt_token_ids) - - # Verify all requests are scheduled. - for req_id, num_tokens in output.num_scheduled_tokens.items(): - self.assertEqual(num_tokens, - len(requests[int(req_id)].prompt_token_ids)) - self.assertEqual(len(output.scheduled_encoder_inputs), len(requests)) - for req_id, encoder_input in output.scheduled_encoder_inputs.items(): - assert len(encoder_input) == 1 - - # Verify requests moved from waiting to running - self.assertEqual(len(scheduler.waiting), 0) - self.assertEqual(len(scheduler.running), len(requests)) - for i, request in enumerate(requests): - self.assertEqual(scheduler.running[i], request) - - def test_concurrent_partial_prefills_schedule(self): - '''Test concurrent partial prefills scheduling. - total requests = 10, every request has 10 token. - while set long_prefill_token_threshold = 1, scheduler can - only schedule max_long_partial_prefills long request. - ''' - scheduler = self.create_scheduler() - scheduler.scheduler_config.chunked_prefill_enabled = False - scheduler.scheduler_config.max_long_partial_prefills = 2 - scheduler.scheduler_config.long_prefill_token_threshold = 1 - requests = create_requests(num_requests=10, num_tokens=20) - for request in requests: - scheduler.add_request(request) - - # Test initial scheduling - output = scheduler.schedule() - self.assertEqual(len(output.scheduled_new_reqs), - scheduler.scheduler_config.max_long_partial_prefills) - self.assertEqual(output.scheduled_cached_reqs.num_reqs, 0) - self.assertEqual(len(output.finished_req_ids), 0) - - def test_schedule_enable_prefix_caching(self): - '''Test scheduling. - Two cases: default APC/no prompt logprobs; APC=True + prompt logprobs - ''' - global ENABLE_PREFIX_CACHING - ENABLE_PREFIX_CACHING = True - global PROMPT_LOGPROBS - PROMPT_LOGPROBS = 5 - scheduler = self.create_scheduler() - scheduler.scheduler_config.chunked_prefill_enabled = False - requests = create_requests(num_requests=10) - for request in requests: - scheduler.add_request(request) - - # Test initial scheduling - output = scheduler.schedule() - self.assertEqual(len(output.scheduled_new_reqs), len(requests)) - self.assertEqual(output.scheduled_cached_reqs.num_reqs, 0) - self.assertEqual(len(output.finished_req_ids), 0) - # Verify all requests are scheduled. - for req_id, num_tokens in output.num_scheduled_tokens.items(): - self.assertEqual(num_tokens, - len(requests[int(req_id)].prompt_token_ids)) - - # Verify requests moved from waiting to running - self.assertEqual(len(scheduler.waiting), 0) - self.assertEqual(len(scheduler.running), len(requests)) - for i, request in enumerate(requests): - self.assertEqual(scheduler.running[i], request) - - def test_stop_via_update_from_output(self): - """Test stopping behavior through update_from_output""" - global NUM_SPECULATIVE_TOKENS - NUM_SPECULATIVE_TOKENS = 1 - scheduler = self.create_scheduler() - - # Test case 1: Stop on EOS token - requests = create_requests(num_requests=2, max_tokens=10) - for req in requests: - req.num_computed_tokens = req.num_tokens - scheduler.requests[req.request_id] = req - scheduler.running.append(req) - req.status = RequestStatus.RUNNING - - scheduler_output = SchedulerOutput(scheduled_new_reqs=[], - scheduled_cached_reqs=[], - num_scheduled_tokens={ - requests[0].request_id: 1, - requests[1].request_id: 2 - }, - total_num_scheduled_tokens=3, - scheduled_encoder_inputs={}, - scheduled_spec_decode_tokens={ - requests[0].request_id: [], - requests[1].request_id: [10] - }, - num_common_prefix_blocks=0, - finished_req_ids=set(), - free_encoder_mm_hashes=[]) - model_output = ModelRunnerOutput( - req_ids=[req.request_id for req in requests], - req_id_to_index={ - req.request_id: i - for i, req in enumerate(requests) - }, - sampled_token_ids=[np.array([EOS_TOKEN_ID]), - np.array([10, 11]) - ], # First request hits EOS, second continues - logprobs=None, - prompt_logprobs_dict={}, - pooler_output=[]) - - scheduler.update_from_output(scheduler_output, model_output) - - # Verify first request stopped, second continues - self.assertEqual(len(scheduler.running), 1) - self.assertEqual(scheduler.running[0].request_id, - requests[1].request_id) - self.assertEqual(requests[0].status, RequestStatus.FINISHED_STOPPED) - self.assertIn(requests[0].request_id, scheduler.finished_req_ids) - self.assertEqual(list(requests[0].output_token_ids), [EOS_TOKEN_ID]) - self.assertEqual(list(requests[1].output_token_ids), [10, 11]) - - # Test case 2: Stop on custom stop token - NUM_SPECULATIVE_TOKENS = 2 - scheduler = self.create_scheduler() - requests = create_requests(num_requests=2, - max_tokens=10, - stop_token_ids=[42, 43]) - for req in requests: - req.num_computed_tokens = req.num_tokens - scheduler.requests[req.request_id] = req - scheduler.running.append(req) - req.status = RequestStatus.RUNNING - - scheduler_output = SchedulerOutput(scheduled_new_reqs=[], - scheduled_cached_reqs=[], - num_scheduled_tokens={ - requests[0].request_id: 3, - requests[1].request_id: 2 - }, - total_num_scheduled_tokens=5, - scheduled_encoder_inputs={}, - scheduled_spec_decode_tokens={ - requests[0].request_id: - [10, 42], - requests[1].request_id: [13] - }, - num_common_prefix_blocks=0, - finished_req_ids=set(), - free_encoder_mm_hashes=[]) - model_output = ModelRunnerOutput( - req_ids=[req.request_id for req in requests], - req_id_to_index={ - req.request_id: i - for i, req in enumerate(requests) - }, - sampled_token_ids=[np.array([10, 42, 12]), - np.array([13, 14]) - ], # First request hits stop token - logprobs=None, - prompt_logprobs_dict={}, - pooler_output=[]) - - scheduler.update_from_output(scheduler_output, model_output) - - # Verify first request stopped on custom token - self.assertEqual(len(scheduler.running), 1) - self.assertEqual(scheduler.running[0].request_id, - requests[1].request_id) - self.assertEqual(requests[0].status, RequestStatus.FINISHED_STOPPED) - self.assertEqual(requests[0].stop_reason, 42) - self.assertIn(requests[0].request_id, scheduler.finished_req_ids) - self.assertEqual(list(requests[0].output_token_ids), [10, 42]) - self.assertEqual(list(requests[1].output_token_ids), [13, 14]) - - # Test case 3: Stop on max tokens - NUM_SPECULATIVE_TOKENS = 2 - scheduler = self.create_scheduler() - requests = create_requests(num_requests=2, max_tokens=2) - for req in requests: - req.num_computed_tokens = req.num_tokens - scheduler.requests[req.request_id] = req - scheduler.running.append(req) - req.status = RequestStatus.RUNNING - - scheduler_output = SchedulerOutput(scheduled_new_reqs=[], - scheduled_cached_reqs=[], - num_scheduled_tokens={ - requests[0].request_id: 3, - requests[1].request_id: 1 - }, - total_num_scheduled_tokens=4, - scheduled_encoder_inputs={}, - scheduled_spec_decode_tokens={ - requests[0].request_id: - [10, 11], - requests[1].request_id: [] - }, - num_common_prefix_blocks=0, - finished_req_ids=set(), - free_encoder_mm_hashes=[]) - model_output = ModelRunnerOutput( - req_ids=[req.request_id for req in requests], - req_id_to_index={ - req.request_id: i - for i, req in enumerate(requests) - }, - sampled_token_ids=[np.array([10, 11, 12]), - np.array([13]) - ], # First request exceeds max_tokens - logprobs=None, - prompt_logprobs_dict={}, - pooler_output=[]) - scheduler.update_from_output(scheduler_output, model_output) - - # Verify first request stopped due to length - self.assertEqual(len(scheduler.running), 1) - self.assertEqual(scheduler.running[0].request_id, - requests[1].request_id) - self.assertEqual(requests[0].status, - RequestStatus.FINISHED_LENGTH_CAPPED) - self.assertIn(requests[0].request_id, scheduler.finished_req_ids) - self.assertEqual(list(requests[0].output_token_ids), [10, 11]) - self.assertEqual(list(requests[1].output_token_ids), [13]) - - # Test case 4: Ignore EOS flag - scheduler = self.create_scheduler() - requests = create_requests(num_requests=1, max_tokens=10) - requests[0].sampling_params.ignore_eos = True - requests[0].num_computed_tokens = requests[0].num_tokens - scheduler.requests[requests[0].request_id] = requests[0] - scheduler.running.append(requests[0]) - - scheduler_output = SchedulerOutput( - scheduled_new_reqs=[], - scheduled_cached_reqs=[], - num_scheduled_tokens={requests[0].request_id: 3}, - total_num_scheduled_tokens=3, - scheduled_encoder_inputs={}, - scheduled_spec_decode_tokens={ - requests[0].request_id: [EOS_TOKEN_ID, 10] - }, - num_common_prefix_blocks=0, - finished_req_ids=set(), - free_encoder_mm_hashes=[]) - model_output = ModelRunnerOutput( - req_ids=[requests[0].request_id], - req_id_to_index={requests[0].request_id: 0}, - sampled_token_ids=[np.array([EOS_TOKEN_ID, 10, 11])], - logprobs=None, - prompt_logprobs_dict={}, - pooler_output=[]) - - scheduler.update_from_output(scheduler_output, model_output) - - # Verify request continues past EOS - self.assertEqual(len(scheduler.running), 1) - self.assertFalse(requests[0].is_finished()) - self.assertEqual(list(requests[0].output_token_ids), - [EOS_TOKEN_ID, 10, 11]) - - def test_schedule_concurrent_batches(self): - global MAX_NUM_BATCHED_TOKENS - global ENABLE_PREFIX_CACHING - global ENABLE_CHUNKED_PREFILL - global MAX_NUM_SEQS - global PROMPT_LOGPROBS - ENABLE_PREFIX_CACHING = None - MAX_NUM_BATCHED_TOKENS = 1024 - MAX_NUM_SEQS = 2 - ENABLE_CHUNKED_PREFILL = True - PROMPT_LOGPROBS = None - - enable_prefix_caching_list = [None, True] - prompt_logprobs_list = [None, 5] - - for i in range(len(enable_prefix_caching_list)): - ENABLE_PREFIX_CACHING = enable_prefix_caching_list[i] - PROMPT_LOGPROBS = prompt_logprobs_list[i] - scheduler = self.create_scheduler() - requests = create_requests( - num_requests=2, - num_tokens=512, - ) - - # Schedule the first request. - scheduler.add_request(requests[0]) - scheduler_output0 = scheduler.schedule() - self.assertEqual(len(scheduler_output0.scheduled_new_reqs), 1) - self.assertEqual( - scheduler_output0.num_scheduled_tokens[requests[0].request_id], - 512) - - # The first request is still running, so only schedule the second request. - scheduler.add_request(requests[1]) - scheduler_output1 = scheduler.schedule() - self.assertEqual(len(scheduler_output1.scheduled_new_reqs), 1) - self.assertEqual( - scheduler_output1.num_scheduled_tokens[requests[1].request_id], - 512) - - # Model output of the first request. - model_runner_output = ModelRunnerOutput( - req_ids=[requests[0].request_id], - req_id_to_index={requests[0].request_id: 0}, - sampled_token_ids=[np.array([0], dtype=np.int64)], - logprobs=None, - prompt_logprobs_dict={}, - pooler_output=[]) - - scheduler.update_from_output(scheduler_output0, - model_runner_output) - - # Schedule the next step. - # The first request can be scheduled again while the second - # request is still running. - scheduler.schedule() - # Model output of the second request. - model_runner_output = ModelRunnerOutput( - req_ids=[requests[1].request_id], - req_id_to_index={requests[1].request_id: 0}, - sampled_token_ids=[np.array([0], dtype=np.int64)], - logprobs=None, - prompt_logprobs_dict={}, - pooler_output=[]) - - scheduler.update_from_output(scheduler_output1, - model_runner_output) - - def test_schedule_spec_decoding_stats(self): - """Test scheduling behavior with speculative decoding. - - This test verifies that: - 1. Speculated tokens get scheduled correctly - 2. Spec decoding stats properly count number of draft and accepted tokens - """ - spec_tokens_list: List[List[List[int]]] = [[[1, 2, 3]], [[1, 2, 3]], - [[1, 2], [3]], [[1]], [[]], - [[1, 2, 3], [4, 5, 6]]] - output_tokens_list: List[List[List[int]]] = [ - [np.array([1, 2, 3, 4])], [np.array([1, 5])], - [np.array([1, 2, 5]), np.array([3, 4])], [np.array([1, 2])], - [np.array([5])], [np.array([1, 2, 7]), - np.array([4, 8])] - ] - expected_list: List[Tuple[int, int, - int, List[int]]] = [(1, 3, 3, [1, 1, 1]), - (1, 3, 1, [1, 0, 0]), - (2, 3, 3, [2, 1]), - (1, 1, 1, [1]), - (0, 0, 0, [0]), - (2, 6, 3, [2, 1, 0])] - - global NUM_SPECULATIVE_TOKENS - for idx in range(len(spec_tokens_list)): - spec_tokens = spec_tokens_list[idx] - output_tokens = output_tokens_list[idx] - expected = expected_list[idx] - num_spec_tokens = max(1, max(len(t) for t in spec_tokens)) - NUM_SPECULATIVE_TOKENS = num_spec_tokens - scheduler = self.create_scheduler() - requests = create_requests(num_requests=len(spec_tokens), - num_tokens=1) - req_ids = [] - req_to_index = {} - for i, request in enumerate(requests): - scheduler.add_request(request) - req_ids.append(request.request_id) - req_to_index[request.request_id] = i - - # Schedule a decode, which will also draft speculative tokens - output = scheduler.schedule() - self.assertEqual(len(output.scheduled_new_reqs), len(requests)) - self.assertEqual(output.total_num_scheduled_tokens, len(requests)) - for i in range(len(requests)): - req_id = requests[i].request_id - self.assertEqual(output.num_scheduled_tokens[req_id], 1) - self.assertNotIn(req_id, output.scheduled_spec_decode_tokens) - - model_runner_output = ModelRunnerOutput( - req_ids=req_ids, - req_id_to_index=req_to_index, - sampled_token_ids=[ - np.array([0]) for _ in range(len(requests)) - ], - logprobs=None, - prompt_logprobs_dict={}, - pooler_output=[]) - draft_token_ids = DraftTokenIds(req_ids, spec_tokens) - - engine_core_outputs = scheduler.update_from_output( - output, model_runner_output) - scheduler.update_draft_token_ids(draft_token_ids) - - for i in range(len(requests)): - running_req = scheduler.running[i] - # The prompt token - self.assertEqual(running_req.num_computed_tokens, 1) - # The prompt token and the sampled token - self.assertEqual(running_req.num_tokens, 2) - # The prompt token, the sampled token, and the speculated tokens - self.assertEqual(running_req.num_tokens_with_spec, - 2 + len(spec_tokens[i])) - - # No draft or accepted tokens counted yet - self.assertTrue( - not engine_core_outputs - or (engine_core_outputs[0].scheduler_stats.spec_decoding_stats - is None)) - - # Schedule the speculated tokens for validation - output = scheduler.schedule() - self.assertEqual(len(output.scheduled_new_reqs), 0) - # The sampled token and speculated tokens - self.assertEqual( - output.total_num_scheduled_tokens, - len(requests) + sum(len(ids) for ids in spec_tokens)) - for i in range(len(requests)): - req_id = requests[i].request_id - self.assertEqual(output.num_scheduled_tokens[req_id], - 1 + len(spec_tokens[i])) - if spec_tokens[i]: - self.assertEqual( - len(output.scheduled_spec_decode_tokens[req_id]), - len(spec_tokens[i])) - else: - self.assertNotIn(req_id, - output.scheduled_spec_decode_tokens) - - model_runner_output = ModelRunnerOutput( - req_ids=req_ids, - req_id_to_index=req_to_index, - sampled_token_ids=output_tokens, - logprobs=None, - prompt_logprobs_dict={}, - pooler_output=[]) - - engine_core_outputs = scheduler.update_from_output( - output, model_runner_output) - - scheduler_stats = engine_core_outputs[0].scheduler_stats \ - if engine_core_outputs else None - if expected[0] == 0: - self.assertIsNone(scheduler_stats.spec_decoding_stats) - else: - self.assertIsNotNone(scheduler_stats.spec_decoding_stats) - stats = scheduler_stats.spec_decoding_stats - self.assertEqual(stats.num_drafts, expected[0]) - self.assertEqual(stats.num_draft_tokens, expected[1]) - self.assertEqual(stats.num_accepted_tokens, expected[2]) - self.assertEqual(stats.num_accepted_tokens_per_pos, - expected[3]) - - def assert_scheduler_empty(self, scheduler): - """Confirm the scheduler is "empty" - i.e. no leaks.""" - # Scheduler Metadata. - scheduler = self.create_scheduler() - self.assertEqual(len(scheduler.requests), 0) - self.assertEqual(len(scheduler.waiting), 0) - self.assertEqual(len(scheduler.running), 0) - self.assertEqual(len(scheduler.finished_req_ids), 0) - - # EncoderCacheManager. - self.assertEqual(len(scheduler.encoder_cache_manager.freed), 0) - self.assertEqual(len(scheduler.encoder_cache_manager.cached), 0) - - # KVCache Manager. - self.assertEqual( - len(scheduler.kv_cache_manager.coordinator.single_type_managers[0]. - req_to_blocks), 0) - self.assertEqual( - len(scheduler.kv_cache_manager.coordinator.single_type_managers[0]. - num_cached_block), 0) - num_free_blocks = (scheduler.kv_cache_manager.block_pool. - free_block_queue.num_free_blocks) - self.assertEqual( - num_free_blocks, - scheduler.kv_cache_manager.block_pool.num_gpu_blocks - 1) - - # NOTE(rob): just the ref count on blocks will be 0. The hash - # value, etc will remain since we lazily evict for prefix cache. - for block in scheduler.kv_cache_manager.block_pool.blocks: - self.assertEqual(block.ref_cnt, 0) - - def test_memory_leak(self): - """Test that we do not have a memory leak.""" - scheduler = self.create_scheduler() - NUM_REQUESTS = 5 - NUM_TOKENS = 10 - MAX_TOKENS = 10 - requests = create_requests(num_requests=NUM_REQUESTS, - num_tokens=NUM_TOKENS, - max_tokens=MAX_TOKENS) - - # Add each request. - for request in requests: - scheduler.add_request(request) - scheduler_output = scheduler.schedule() - model_runner_output = make_output(scheduler) - scheduler.update_from_output(scheduler_output, model_runner_output) - - # Iterate until done. - while True: - scheduler_output = scheduler.schedule() - if len(scheduler.running) == 0: - break - model_runner_output = make_output(scheduler) - scheduler.update_from_output(scheduler_output, model_runner_output) - - # Confirm no memory leak. - self.assert_scheduler_empty(scheduler) - - def test_scheduler_with_pd_transfer(self): - scheduler = self.create_scheduler() - scheduler.phase = "prefill" - requests = create_requests(num_requests=32) - for request in requests: - scheduler.add_request(request) - - # 1st iteration, move 16 requests from waiting to running for prefill - scheduler_output = scheduler.schedule() - model_runner_output = make_output(scheduler) - scheduler.update_from_output(scheduler_output, model_runner_output) - first_iter_prefilled_req_num = len(scheduler.running) - self.assertEqual(len(scheduler_output.scheduled_new_reqs), - scheduler.max_num_running_reqs) - self.assertEqual(scheduler_output.scheduled_cached_reqs.num_reqs, 0) - self.assertEqual(len(scheduler_output.finished_req_ids), 0) - - # 2nd iteration, move 16 prefilled requests to finished_prefill_reqs - # and move 16 requests from waiting to running for prefill - scheduler_output = scheduler.schedule() - model_runner_output = make_output(scheduler) - scheduler.update_from_output(scheduler_output, model_runner_output) - self.assertEqual(len(scheduler.finished_prefill_reqs), - first_iter_prefilled_req_num) - - # 3rd iteration, all requests prefilled, change scheduler phase to decode - scheduler_output = scheduler.schedule() - model_runner_output = make_output(scheduler) - scheduler.update_from_output(scheduler_output, model_runner_output) - self.assertEqual(scheduler.phase, "decode") - - -class TestSchedulerDynamicBatch(TestBase): - - @patch("vllm.config.ModelConfig.__post_init__", MagicMock()) - @patch("vllm.config.VllmConfig.__post_init__", MagicMock()) - @patch('vllm.v1.core.sched.scheduler.compute_encoder_budget') - def create_scheduler(self, mock_compute_encoder_budget): - mock_compute_encoder_budget.return_value = [100, 100] - use_kv_connector = False - block_size = 16 - - scheduler_config = SchedulerConfig( - max_num_seqs=16, - max_model_len=MAX_NUM_BATCHED_TOKENS, - long_prefill_token_threshold=LONG_PREFILL_TOKEN_THRESHOLD, - disable_chunked_mm_input=False, - enable_chunked_prefill=True, - max_num_batched_tokens=MAX_NUM_BATCHED_TOKENS, - ) - - scheduler_config.max_num_encoder_input_tokens = 10000 - scheduler_config.encoder_cache_size = 10000 - scheduler_config.chunked_prefill_enabled = True - scheduler_config.SLO_limits_for_dynamic_batch = 0 - - model_config = ModelConfig( - model=MODEL, - task="auto", - tokenizer=MODEL, - tokenizer_mode="auto", - trust_remote_code=True, - dtype="float16", - seed=42, - max_model_len=MAX_NUM_BATCHED_TOKENS, - ) - model_config.pooler_config = MagicMock() - model_config.multimodal_config = MagicMock() - model_config.hf_config = MagicMock() - model_config.hf_config.is_encoder_decoder = False - # Cache config, optionally force APC - kwargs_cache: Dict[str, - Any] = ({} if ENABLE_PREFIX_CACHING is None else { - 'enable_prefix_caching': - ENABLE_PREFIX_CACHING - }) - cache_config = CacheConfig( - block_size=block_size, - gpu_memory_utilization=0.9, - swap_space=0, - cache_dtype="auto", - **kwargs_cache, - ) - - kv_transfer_config = KVTransferConfig( - kv_connector="SharedStorageConnector", - kv_role="kv_both", - kv_connector_extra_config={"shared_storage_path": "local_storage"}, - ) if use_kv_connector else None - - speculative_config: Optional[SpeculativeConfig] = None - if NUM_SPECULATIVE_TOKENS is not None: - speculative_config = SpeculativeConfig( - model="ngram", num_speculative_tokens=NUM_SPECULATIVE_TOKENS) - - vllm_config = VllmConfig( - scheduler_config=scheduler_config, - model_config=model_config, - cache_config=cache_config, - kv_transfer_config=kv_transfer_config, - speculative_config=speculative_config, - ) - - kv_cache_config = KVCacheConfig( - num_blocks=10000, # A large number of blocks to hold all requests - kv_cache_tensors=[], - kv_cache_groups=[ - KVCacheGroupSpec(['layer'], - FullAttentionSpec(block_size, 1, 1, - torch.float32, False)) - ], - ) - cache_config.num_gpu_blocks = 10000 - - scheduler = SchedulerDynamicBatch( - vllm_config=vllm_config, - kv_cache_config=kv_cache_config, - log_stats=True, - structured_output_manager=MagicMock(spec=StructuredOutputManager), - ) - - should_advance = MagicMock() - should_advance.return_value = False - scheduler.structured_output_manager.should_advance = should_advance - - return scheduler - - def test_add_requests(self): - scheduler = self.create_scheduler() - requests = create_requests(num_requests=10) - - for i, request in enumerate(requests): - scheduler.add_request(request) - self.assertIn(request.request_id, scheduler.requests) - self.assertEqual(len(scheduler.waiting), i + 1) - - def test_finish_request(self): - scheduler = self.create_scheduler() - requests = create_requests(num_requests=10) - for request in requests: - scheduler.add_request(request) - - for i, request in enumerate(requests): - scheduler.finish_requests(request.request_id, - RequestStatus.FINISHED_ABORTED) - self.assertNotIn(request.request_id, scheduler.requests) - self.assertEqual(len(scheduler.waiting), 9 - i) - - def test_get_num_unfinished_requests(self): - scheduler = self.create_scheduler() - requests = create_requests(num_requests=10) - for request in requests: - scheduler.add_request(request) - - for i, request in enumerate(requests): - scheduler.finish_requests(request.request_id, - RequestStatus.FINISHED_STOPPED) - self.assertEqual(scheduler.get_num_unfinished_requests(), - len(requests) - i - 1) - - def test_schedule(self): - '''Test scheduling. - Two cases: default APC/no prompt logprobs; APC=True + prompt logprobs - ''' - scheduler = self.create_scheduler() - scheduler.scheduler_config.chunked_prefill_enabled = True - requests = create_requests(num_requests=10) - for request in requests: - scheduler.add_request(request) - - # Test initial scheduling - output = scheduler.schedule() - self.assertEqual(len(output.scheduled_new_reqs), len(requests)) - self.assertEqual(output.scheduled_cached_reqs.num_reqs, 0) - self.assertEqual(len(output.finished_req_ids), 0) - # Verify all requests are scheduled. - for req_id, num_tokens in output.num_scheduled_tokens.items(): - self.assertEqual(num_tokens, - len(requests[int(req_id)].prompt_token_ids)) - - # Verify requests moved from waiting to running - self.assertEqual(len(scheduler.waiting), 0) - self.assertEqual(len(scheduler.running), len(requests)) - for i, request in enumerate(requests): - self.assertEqual(scheduler.running[i], request) - - def test_schedule_multimodal_requests(self): - scheduler = self.create_scheduler() - scheduler.scheduler_config.chunked_prefill_enabled = True - mm_positions = [[PlaceholderRange(offset=i, length=10)] - for i in range(10)] - requests = create_requests( - num_requests=10, - mm_positions=mm_positions, - ) - for request in requests: - scheduler.add_request(request) - - output = scheduler.schedule() - self.assertEqual(len(output.scheduled_new_reqs), len(requests)) - self.assertEqual(output.scheduled_cached_reqs.num_reqs, 0) - self.assertEqual(len(output.finished_req_ids), 0) - for req_id, num_tokens in output.num_scheduled_tokens.items(): - assert num_tokens == len(requests[int(req_id)].prompt_token_ids) - - # Verify all requests are scheduled. - for req_id, num_tokens in output.num_scheduled_tokens.items(): - self.assertEqual(num_tokens, - len(requests[int(req_id)].prompt_token_ids)) - self.assertEqual(len(output.scheduled_encoder_inputs), len(requests)) - for req_id, encoder_input in output.scheduled_encoder_inputs.items(): - assert len(encoder_input) == 1 - - # Verify requests moved from waiting to running - self.assertEqual(len(scheduler.waiting), 0) - self.assertEqual(len(scheduler.running), len(requests)) - for i, request in enumerate(requests): - self.assertEqual(scheduler.running[i], request) - - def test_schedule_enable_prefix_caching(self): - '''Test scheduling. - Two cases: default APC/no prompt logprobs; APC=True + prompt logprobs - ''' - global ENABLE_PREFIX_CACHING - ENABLE_PREFIX_CACHING = True - global PROMPT_LOGPROBS - PROMPT_LOGPROBS = 5 - scheduler = self.create_scheduler() - scheduler.scheduler_config.chunked_prefill_enabled = False - requests = create_requests(num_requests=10) - for request in requests: - scheduler.add_request(request) - - # Test initial scheduling - output = scheduler.schedule() - self.assertEqual(len(output.scheduled_new_reqs), len(requests)) - self.assertEqual(output.scheduled_cached_reqs.num_reqs, 0) - self.assertEqual(len(output.finished_req_ids), 0) - # Verify all requests are scheduled. - for req_id, num_tokens in output.num_scheduled_tokens.items(): - self.assertEqual(num_tokens, - len(requests[int(req_id)].prompt_token_ids)) - - # Verify requests moved from waiting to running - self.assertEqual(len(scheduler.waiting), 0) - self.assertEqual(len(scheduler.running), len(requests)) - for i, request in enumerate(requests): - self.assertEqual(scheduler.running[i], request) - - def test_stop_via_update_from_output(self): - """Test stopping behavior through update_from_output""" - global NUM_SPECULATIVE_TOKENS - NUM_SPECULATIVE_TOKENS = 1 - scheduler = self.create_scheduler() - - # Test case 1: Stop on EOS token - requests = create_requests(num_requests=2, max_tokens=10) - for req in requests: - req.num_computed_tokens = req.num_tokens - scheduler.requests[req.request_id] = req - scheduler.running.append(req) - req.status = RequestStatus.RUNNING - - scheduler_output = SchedulerOutput(scheduled_new_reqs=[], - scheduled_cached_reqs=[], - num_scheduled_tokens={ - requests[0].request_id: 1, - requests[1].request_id: 2 - }, - total_num_scheduled_tokens=3, - scheduled_encoder_inputs={}, - scheduled_spec_decode_tokens={ - requests[0].request_id: [], - requests[1].request_id: [10] - }, - num_common_prefix_blocks=0, - finished_req_ids=set(), - free_encoder_mm_hashes=[]) - model_output = ModelRunnerOutput( - req_ids=[req.request_id for req in requests], - req_id_to_index={ - req.request_id: i - for i, req in enumerate(requests) - }, - sampled_token_ids=[np.array([EOS_TOKEN_ID]), - np.array([10, 11]) - ], # First request hits EOS, second continues - logprobs=None, - prompt_logprobs_dict={}, - pooler_output=[]) - - scheduler.update_from_output(scheduler_output, model_output) - - # Verify first request stopped, second continues - self.assertEqual(len(scheduler.running), 1) - self.assertEqual(scheduler.running[0].request_id, - requests[1].request_id) - self.assertEqual(requests[0].status, RequestStatus.FINISHED_STOPPED) - self.assertIn(requests[0].request_id, scheduler.finished_req_ids) - self.assertEqual(list(requests[0].output_token_ids), [EOS_TOKEN_ID]) - self.assertEqual(list(requests[1].output_token_ids), [10, 11]) - - # Test case 2: Stop on custom stop token - NUM_SPECULATIVE_TOKENS = 2 - scheduler = self.create_scheduler() - requests = create_requests(num_requests=2, - max_tokens=10, - stop_token_ids=[42, 43]) - for req in requests: - req.num_computed_tokens = req.num_tokens - scheduler.requests[req.request_id] = req - scheduler.running.append(req) - req.status = RequestStatus.RUNNING - - scheduler_output = SchedulerOutput(scheduled_new_reqs=[], - scheduled_cached_reqs=[], - num_scheduled_tokens={ - requests[0].request_id: 3, - requests[1].request_id: 2 - }, - total_num_scheduled_tokens=5, - scheduled_encoder_inputs={}, - scheduled_spec_decode_tokens={ - requests[0].request_id: - [10, 42], - requests[1].request_id: [13] - }, - num_common_prefix_blocks=0, - finished_req_ids=set(), - free_encoder_mm_hashes=[]) - model_output = ModelRunnerOutput( - req_ids=[req.request_id for req in requests], - req_id_to_index={ - req.request_id: i - for i, req in enumerate(requests) - }, - sampled_token_ids=[np.array([10, 42, 12]), - np.array([13, 14]) - ], # First request hits stop token - logprobs=None, - prompt_logprobs_dict={}, - pooler_output=[]) - - scheduler.update_from_output(scheduler_output, model_output) - - # Verify first request stopped on custom token - self.assertEqual(len(scheduler.running), 1) - self.assertEqual(scheduler.running[0].request_id, - requests[1].request_id) - self.assertEqual(requests[0].status, RequestStatus.FINISHED_STOPPED) - self.assertEqual(requests[0].stop_reason, 42) - self.assertIn(requests[0].request_id, scheduler.finished_req_ids) - self.assertEqual(list(requests[0].output_token_ids), [10, 42]) - self.assertEqual(list(requests[1].output_token_ids), [13, 14]) - - # Test case 3: Stop on max tokens - NUM_SPECULATIVE_TOKENS = 2 - scheduler = self.create_scheduler() - requests = create_requests(num_requests=2, max_tokens=2) - for req in requests: - req.num_computed_tokens = req.num_tokens - scheduler.requests[req.request_id] = req - scheduler.running.append(req) - req.status = RequestStatus.RUNNING - - scheduler_output = SchedulerOutput(scheduled_new_reqs=[], - scheduled_cached_reqs=[], - num_scheduled_tokens={ - requests[0].request_id: 3, - requests[1].request_id: 1 - }, - total_num_scheduled_tokens=4, - scheduled_encoder_inputs={}, - scheduled_spec_decode_tokens={ - requests[0].request_id: - [10, 11], - requests[1].request_id: [] - }, - num_common_prefix_blocks=0, - finished_req_ids=set(), - free_encoder_mm_hashes=[]) - model_output = ModelRunnerOutput( - req_ids=[req.request_id for req in requests], - req_id_to_index={ - req.request_id: i - for i, req in enumerate(requests) - }, - sampled_token_ids=[np.array([10, 11, 12]), - np.array([13]) - ], # First request exceeds max_tokens - logprobs=None, - prompt_logprobs_dict={}, - pooler_output=[]) - scheduler.update_from_output(scheduler_output, model_output) - - # Verify first request stopped due to length - self.assertEqual(len(scheduler.running), 1) - self.assertEqual(scheduler.running[0].request_id, - requests[1].request_id) - self.assertEqual(requests[0].status, - RequestStatus.FINISHED_LENGTH_CAPPED) - self.assertIn(requests[0].request_id, scheduler.finished_req_ids) - self.assertEqual(list(requests[0].output_token_ids), [10, 11]) - self.assertEqual(list(requests[1].output_token_ids), [13]) - - # Test case 4: Ignore EOS flag - scheduler = self.create_scheduler() - requests = create_requests(num_requests=1, max_tokens=10) - requests[0].sampling_params.ignore_eos = True - requests[0].num_computed_tokens = requests[0].num_tokens - scheduler.requests[requests[0].request_id] = requests[0] - scheduler.running.append(requests[0]) - - scheduler_output = SchedulerOutput( - scheduled_new_reqs=[], - scheduled_cached_reqs=[], - num_scheduled_tokens={requests[0].request_id: 3}, - total_num_scheduled_tokens=3, - scheduled_encoder_inputs={}, - scheduled_spec_decode_tokens={ - requests[0].request_id: [EOS_TOKEN_ID, 10] - }, - num_common_prefix_blocks=0, - finished_req_ids=set(), - free_encoder_mm_hashes=[]) - model_output = ModelRunnerOutput( - req_ids=[requests[0].request_id], - req_id_to_index={requests[0].request_id: 0}, - sampled_token_ids=[np.array([EOS_TOKEN_ID, 10, 11])], - logprobs=None, - prompt_logprobs_dict={}, - pooler_output=[]) - - scheduler.update_from_output(scheduler_output, model_output) - - # Verify request continues past EOS - self.assertEqual(len(scheduler.running), 1) - self.assertFalse(requests[0].is_finished()) - self.assertEqual(list(requests[0].output_token_ids), - [EOS_TOKEN_ID, 10, 11]) - - def test_schedule_concurrent_batches(self): - global MAX_NUM_BATCHED_TOKENS - global ENABLE_PREFIX_CACHING - global ENABLE_CHUNKED_PREFILL - global MAX_NUM_SEQS - global PROMPT_LOGPROBS - ENABLE_PREFIX_CACHING = None - MAX_NUM_BATCHED_TOKENS = 1024 - MAX_NUM_SEQS = 2 - ENABLE_CHUNKED_PREFILL = True - PROMPT_LOGPROBS = None - - enable_prefix_caching_list = [None, True] - prompt_logprobs_list = [None, 5] - - for i in range(len(enable_prefix_caching_list)): - ENABLE_PREFIX_CACHING = enable_prefix_caching_list[i] - PROMPT_LOGPROBS = prompt_logprobs_list[i] - scheduler = self.create_scheduler() - requests = create_requests( - num_requests=2, - num_tokens=512, - ) - - # Schedule the first request. - scheduler.add_request(requests[0]) - scheduler_output0 = scheduler.schedule() - self.assertEqual(len(scheduler_output0.scheduled_new_reqs), 1) - self.assertEqual( - scheduler_output0.num_scheduled_tokens[requests[0].request_id], - 512) - - # The first request is still running, so only schedule the second request. - scheduler.add_request(requests[1]) - scheduler_output1 = scheduler.schedule() - self.assertEqual(len(scheduler_output1.scheduled_new_reqs), 1) - self.assertEqual( - scheduler_output1.num_scheduled_tokens[requests[1].request_id], - 512) - - # Model output of the first request. - model_runner_output = ModelRunnerOutput( - req_ids=[requests[0].request_id], - req_id_to_index={requests[0].request_id: 0}, - sampled_token_ids=[np.array([0])], - logprobs=None, - prompt_logprobs_dict={}, - pooler_output=[]) - - scheduler.update_from_output(scheduler_output0, - model_runner_output) - - # Schedule the next step. - # The first request can be scheduled again while the second - # request is still running. - scheduler.schedule() - # Model output of the second request. - model_runner_output = ModelRunnerOutput( - req_ids=[requests[1].request_id], - req_id_to_index={requests[1].request_id: 0}, - sampled_token_ids=[np.array([0])], - logprobs=None, - prompt_logprobs_dict={}, - pooler_output=[]) - - scheduler.update_from_output(scheduler_output1, - model_runner_output) - - def test_schedule_spec_decoding_stats(self): - """Test scheduling behavior with speculative decoding. - - This test verifies that: - 1. Speculated tokens get scheduled correctly - 2. Spec decoding stats properly count number of draft and accepted tokens - """ - spec_tokens_list: List[List[List[int]]] = [[[1, 2, 3]], [[1, 2, 3]], - [[1, 2], [3]], [[1]], [[]], - [[1, 2, 3], [4, 5, 6]]] - output_tokens_list: List[List[List[int]]] = [ - [np.array([1, 2, 3, 4])], [np.array([1, 5])], - [np.array([1, 2, 5]), np.array([3, 4])], [np.array([1, 2])], - [np.array([5])], [np.array([1, 2, 7]), - np.array([4, 8])] - ] - expected_list: List[Tuple[int, int, - int, List[int]]] = [(1, 3, 3, [1, 1, 1]), - (1, 3, 1, [1, 0, 0]), - (2, 3, 3, [2, 1]), - (1, 1, 1, [1]), - (0, 0, 0, [0]), - (2, 6, 3, [2, 1, 0])] - - global NUM_SPECULATIVE_TOKENS - for idx in range(len(spec_tokens_list)): - spec_tokens = spec_tokens_list[idx] - output_tokens = output_tokens_list[idx] - expected = expected_list[idx] - num_spec_tokens = max(1, max(len(t) for t in spec_tokens)) - NUM_SPECULATIVE_TOKENS = num_spec_tokens - scheduler = self.create_scheduler() - requests = create_requests(num_requests=len(spec_tokens), - num_tokens=1) - req_ids = [] - req_to_index = {} - for i, request in enumerate(requests): - scheduler.add_request(request) - req_ids.append(request.request_id) - req_to_index[request.request_id] = i - - # Schedule a decode, which will also draft speculative tokens - output = scheduler.schedule() - self.assertEqual(len(output.scheduled_new_reqs), len(requests)) - self.assertEqual(output.total_num_scheduled_tokens, len(requests)) - for i in range(len(requests)): - req_id = requests[i].request_id - self.assertEqual(output.num_scheduled_tokens[req_id], 1) - self.assertNotIn(req_id, output.scheduled_spec_decode_tokens) - - model_runner_output = ModelRunnerOutput( - req_ids=req_ids, - req_id_to_index=req_to_index, - sampled_token_ids=[ - np.array([0]) for _ in range(len(requests)) - ], - logprobs=None, - prompt_logprobs_dict={}, - pooler_output=[]) - draft_token_ids = DraftTokenIds(req_ids, spec_tokens) - - engine_core_outputs = scheduler.update_from_output( - output, model_runner_output) - scheduler.update_draft_token_ids(draft_token_ids) - - for i in range(len(requests)): - running_req = scheduler.running[i] - # The prompt token - self.assertEqual(running_req.num_computed_tokens, 1) - # The prompt token and the sampled token - self.assertEqual(running_req.num_tokens, 2) - # The prompt token, the sampled token, and the speculated tokens - self.assertEqual(running_req.num_tokens_with_spec, - 2 + len(spec_tokens[i])) - - # No draft or accepted tokens counted yet - self.assertTrue( - not engine_core_outputs - or (engine_core_outputs[0].scheduler_stats.spec_decoding_stats - is None)) - - # Schedule the speculated tokens for validation - output = scheduler.schedule() - self.assertEqual(len(output.scheduled_new_reqs), 0) - # The sampled token and speculated tokens - self.assertEqual( - output.total_num_scheduled_tokens, - len(requests) + sum(len(ids) for ids in spec_tokens)) - for i in range(len(requests)): - req_id = requests[i].request_id - self.assertEqual(output.num_scheduled_tokens[req_id], - 1 + len(spec_tokens[i])) - if spec_tokens[i]: - self.assertEqual( - len(output.scheduled_spec_decode_tokens[req_id]), - len(spec_tokens[i])) - else: - self.assertNotIn(req_id, - output.scheduled_spec_decode_tokens) - - model_runner_output = ModelRunnerOutput( - req_ids=req_ids, - req_id_to_index=req_to_index, - sampled_token_ids=output_tokens, - logprobs=None, - prompt_logprobs_dict={}, - pooler_output=[]) - - engine_core_outputs = scheduler.update_from_output( - output, model_runner_output) - - scheduler_stats = engine_core_outputs[0].scheduler_stats \ - if engine_core_outputs else None - if expected[0] == 0: - self.assertIsNone(scheduler_stats.spec_decoding_stats) - else: - self.assertIsNotNone(scheduler_stats.spec_decoding_stats) - stats = scheduler_stats.spec_decoding_stats - self.assertEqual(stats.num_drafts, expected[0]) - self.assertEqual(stats.num_draft_tokens, expected[1]) - self.assertEqual(stats.num_accepted_tokens, expected[2]) - self.assertEqual(stats.num_accepted_tokens_per_pos, - expected[3]) - - def assert_scheduler_empty(self, scheduler): - """Confirm the scheduler is "empty" - i.e. no leaks.""" - # Scheduler Metadata. - scheduler = self.create_scheduler() - self.assertEqual(len(scheduler.requests), 0) - self.assertEqual(len(scheduler.waiting), 0) - self.assertEqual(len(scheduler.running), 0) - self.assertEqual(len(scheduler.finished_req_ids), 0) - - # EncoderCacheManager. - self.assertEqual(len(scheduler.encoder_cache_manager.freed), 0) - self.assertEqual(len(scheduler.encoder_cache_manager.cached), 0) - - # KVCache Manager. - self.assertEqual( - len(scheduler.kv_cache_manager.coordinator.single_type_managers[0]. - req_to_blocks), 0) - self.assertEqual( - len(scheduler.kv_cache_manager.coordinator.single_type_managers[0]. - num_cached_block), 0) - num_free_blocks = (scheduler.kv_cache_manager.block_pool. - free_block_queue.num_free_blocks) - self.assertEqual( - num_free_blocks, - scheduler.kv_cache_manager.block_pool.num_gpu_blocks - 1) - - # NOTE(rob): just the ref count on blocks will be 0. The hash - # value, etc will remain since we lazily evict for prefix cache. - for block in scheduler.kv_cache_manager.block_pool.blocks: - self.assertEqual(block.ref_cnt, 0) - - def test_memory_leak(self): - """Test that we do not have a memory leak.""" - scheduler = self.create_scheduler() - NUM_REQUESTS = 5 - NUM_TOKENS = 10 - MAX_TOKENS = 10 - requests = create_requests(num_requests=NUM_REQUESTS, - num_tokens=NUM_TOKENS, - max_tokens=MAX_TOKENS) - - # Add each request. - for request in requests: - scheduler.add_request(request) - scheduler_output = scheduler.schedule() - model_runner_output = make_output(scheduler) - scheduler.update_from_output(scheduler_output, model_runner_output) - - # Iterate until done. - while True: - scheduler_output = scheduler.schedule() - if len(scheduler.running) == 0: - break - model_runner_output = make_output(scheduler) - scheduler.update_from_output(scheduler_output, model_runner_output) - - # Confirm no memory leak. - self.assert_scheduler_empty(scheduler) diff --git a/tests/ut/ops/test_linear.py b/tests/ut/ops/test_linear.py index 1b3a7268..2f30e4f0 100644 --- a/tests/ut/ops/test_linear.py +++ b/tests/ut/ops/test_linear.py @@ -99,7 +99,6 @@ class TestAscendRowParallelLinear(BaseLinearTest): ascend_config._ASCEND_CONFIG = MagicMock() ascend_config._ASCEND_CONFIG.oproj_tensor_parallel_size = 2 - ascend_config._ASCEND_CONFIG.ascend_scheduler_config.enabled = False linear = AscendRowParallelLinear( input_size=16, diff --git a/tests/ut/ops/test_vocab_parallel_embedding.py b/tests/ut/ops/test_vocab_parallel_embedding.py index 37ea1af1..531df281 100644 --- a/tests/ut/ops/test_vocab_parallel_embedding.py +++ b/tests/ut/ops/test_vocab_parallel_embedding.py @@ -209,12 +209,7 @@ class TestAscendLogitsProcessor(unittest.TestCase): return_value=torch.randn(1, self.vocab_size)), patch( "vllm_ascend.ops.vocab_parallel_embedding.get_lmhead_tp_group.all_gather", - return_value=torch.randn(1, self.vocab_size)), - patch( - "vllm_ascend.core.schedule_config.AscendSchedulerConfig.initialize_from_config", - return_value=MagicMock(max_num_batched_tokens=1000, - max_model_len=512, - enable_chunked_prefill=False)) + return_value=torch.randn(1, self.vocab_size)) ] for p in self.patches: diff --git a/tests/ut/quantization/test_w8a8_dynamic.py b/tests/ut/quantization/test_w8a8_dynamic.py index f25192c2..76d510dd 100644 --- a/tests/ut/quantization/test_w8a8_dynamic.py +++ b/tests/ut/quantization/test_w8a8_dynamic.py @@ -33,13 +33,6 @@ class TestAscendW8A8FusedMoEMethod(TestBase): mock_get_ep_group.return_value = mock_ep_group mock_ascend_config = Mock() - # 创建一个具有具体属性的 Mock 对象来表示 ascend_scheduler_config - mock_ascend_scheduler_config = Mock() - mock_ascend_scheduler_config.enabled = False - mock_ascend_scheduler_config.max_num_batched_tokens = 1024 - mock_ascend_scheduler_config.max_model_len = 2048 - mock_ascend_config.ascend_scheduler_config = mock_ascend_scheduler_config - mock_ascend_config.torchair_graph_config = Mock(enabled=False) mock_ascend_config.enable_chunked_prefill = False mock_get_ascend_config.return_value = mock_ascend_config diff --git a/tests/ut/test_ascend_config.py b/tests/ut/test_ascend_config.py index 718bc85f..be066179 100644 --- a/tests/ut/test_ascend_config.py +++ b/tests/ut/test_ascend_config.py @@ -56,9 +56,6 @@ class TestAscendConfig(TestBase): self.assertTrue(torchair_graph_config.enable_frozen_parameter) self.assertFalse(torchair_graph_config.enable_kv_nz) - ascend_scheduler_config = ascend_config.ascend_scheduler_config - self.assertFalse(ascend_scheduler_config.enabled) - @_clean_up_ascend_config def test_init_ascend_config_with_additional_config(self): test_vllm_config = VllmConfig() @@ -74,9 +71,6 @@ class TestAscendConfig(TestBase): "enable_kv_nz": True }, "multistream_overlap_shared_expert": True, - "ascend_scheduler_config": { - "enabled": True - }, "expert_map_path": "test_expert_map_path", "refresh": True, } @@ -94,9 +88,6 @@ class TestAscendConfig(TestBase): self.assertTrue(torchair_graph_config.enable_frozen_parameter) self.assertTrue(torchair_graph_config.enable_kv_nz) - ascend_scheduler_config = ascend_config.ascend_scheduler_config - self.assertTrue(ascend_scheduler_config.enabled) - @_clean_up_ascend_config def test_init_ascend_config_with_refresh(self): test_vllm_config = VllmConfig() diff --git a/tests/ut/test_platform.py b/tests/ut/test_platform.py index 5fe5cde3..6cc070b6 100644 --- a/tests/ut/test_platform.py +++ b/tests/ut/test_platform.py @@ -32,7 +32,6 @@ class TestNPUPlatform(TestBase): def mock_vllm_ascend_config(): mock_ascend_config = MagicMock() mock_ascend_config.torchair_graph_config.enabled = False - mock_ascend_config.ascend_scheduler_config.enabled = False mock_ascend_config.enable_shared_expert_dp = False return mock_ascend_config @@ -522,31 +521,6 @@ class TestNPUPlatform(TestBase): self.platform.check_and_update_config(vllm_config) self.assertEqual(vllm_config.compilation_config.custom_ops, []) - @patch('vllm_ascend.utils.get_ascend_device_type', - return_value=AscendDeviceType._910_93) - @patch("vllm_ascend.ascend_config.check_ascend_config") - @patch("vllm_ascend.ascend_config.init_ascend_config") - @patch( - "vllm_ascend.core.recompute_schedule_config.RecomputeSchedulerConfig.initialize_from_config" - ) - def test_check_and_update_config_ascend_scheduler_config( - self, mock_init_recompute, mock_init_ascend, mock_check_ascend, - mock_soc_version): - mock_ascend_config = TestNPUPlatform.mock_vllm_ascend_config() - mock_ascend_config.ascend_scheduler_config.enabled = True - mock_init_ascend.return_value = mock_ascend_config - vllm_config = TestNPUPlatform.mock_vllm_config() - vllm_config.parallel_config.tensor_parallel_size = 1 - mock_init_recompute.return_value = MagicMock() - - with patch("vllm_ascend.core.schedule_config.AscendSchedulerConfig" - ) as mock_scheduler: - from vllm_ascend import platform - - importlib.reload(platform) - self.platform.check_and_update_config(vllm_config) - mock_scheduler.initialize_from_config.assert_called_once() - @patch('vllm_ascend.platform.get_ascend_config') def test_get_attn_backend_cls_use_v1_and_mla(self, mock_get_ascend_config): mock_config = MagicMock() diff --git a/tests/ut/test_utils.py b/tests/ut/test_utils.py index 29ed7b44..8ff1419e 100644 --- a/tests/ut/test_utils.py +++ b/tests/ut/test_utils.py @@ -253,12 +253,10 @@ class TestUtils(TestBase): model_path = os.path.join(os.path.dirname(__file__), "fake_weight") test_model_config = ModelConfig(model=model_path, enforce_eager=True) test_parallel_config = ParallelConfig() - ascend_config = {"ascend_scheduler_config": {"enabled": False}} test_vllm_config = VllmConfig( model_config=test_model_config, compilation_config=test_compilation_config, - parallel_config=test_parallel_config, - additional_config=ascend_config) + parallel_config=test_parallel_config) utils.update_aclgraph_sizes(test_vllm_config) os.environ['HCCL_OP_EXPANSION_MODE'] = 'AIV' utils.update_aclgraph_sizes(test_vllm_config) diff --git a/tests/ut/torchair/models/test_torchair_deepseek_v2.py b/tests/ut/torchair/models/test_torchair_deepseek_v2.py index 35e1bb99..e1a5625b 100644 --- a/tests/ut/torchair/models/test_torchair_deepseek_v2.py +++ b/tests/ut/torchair/models/test_torchair_deepseek_v2.py @@ -235,8 +235,6 @@ def test_torchair_deepseek_v2_mlp(mock_distributed, base_config): hidden_act="silu", quant_config=None) assert isinstance(mlp.act_fn, TorchairDeepseekV2SiluAndMul) - ascend_config = MagicMock() - ascend_config._ASCEND_CONFIG.ascend_scheduler_config.enabled = False with patch( "vllm_ascend.torchair.models.torchair_deepseek_v2.QuantizationConfig" ) as mock_quant_config: diff --git a/vllm_ascend/ascend_config.py b/vllm_ascend/ascend_config.py index 16d16a4d..f3c3deed 100644 --- a/vllm_ascend/ascend_config.py +++ b/vllm_ascend/ascend_config.py @@ -39,11 +39,6 @@ class AscendConfig: self.torchair_graph_config = TorchairGraphConfig( torchair_graph_config, vllm_config, additional_config) - ascend_scheduler_config = additional_config.get( - "ascend_scheduler_config", {}) - self.ascend_scheduler_config = AscendSchedulerConfig( - ascend_scheduler_config) - # Dump / PrecisionDebugger configuration dump_config_path = additional_config.get("dump_config", None) self.dump_config = DumpConfig(dump_config_path) @@ -220,20 +215,6 @@ class TorchairGraphConfig: ) -class AscendSchedulerConfig: - """ - Configuration Object for ascend_scheduler_config from additional_config - """ - - def __init__(self, ascend_scheduler_config: dict): - self.enabled = ascend_scheduler_config.get("enabled", False) - # Ascend scheduler is based on vllm v0 scheduler, so we should support - # all vllm v0 scheduler configs as well. - for k, v in ascend_scheduler_config.items(): - if not hasattr(self, k): - setattr(self, k, v) - - class DumpConfig: """ Configuration object for dump/PrecisionDebugger settings. diff --git a/vllm_ascend/core/schedule_config.py b/vllm_ascend/core/schedule_config.py deleted file mode 100644 index 32d63cbc..00000000 --- a/vllm_ascend/core/schedule_config.py +++ /dev/null @@ -1,105 +0,0 @@ -# -# Copyright (c) 2025 Huawei Technologies Co., Ltd. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# This file is a part of the vllm-ascend project. -# - -from dataclasses import dataclass, fields -from typing import Type, Union - -from vllm.config import SchedulerConfig - -MAX_INT = 2147483647 - - -@dataclass -class AscendSchedulerConfig(SchedulerConfig): - enable_chunked_prefill: bool = False - max_long_partial_prefills: int = 1 - long_prefill_token_threshold: int = MAX_INT - policy: str = "fcfs" - scheduler_cls: Union[str, Type[object]] = ( - "vllm_ascend.core.scheduler.AscendScheduler") - enable_pd_transfer: bool = False - decode_max_num_seqs: int = 0 - - @classmethod - def initialize_from_config( - cls, - vllm_scheduler_config: SchedulerConfig, - ascend_scheduler_config, - ): - scheduler_config = { - field.name: getattr(vllm_scheduler_config, field.name) - for field in fields(vllm_scheduler_config) if field.init - } - # Override default values into original SchedulerConfig - scheduler_config["enable_chunked_prefill"] = False - scheduler_config["max_long_partial_prefills"] = None - scheduler_config["long_prefill_token_threshold"] = None - scheduler_config["policy"] = "fcfs" - scheduler_config["scheduler_cls"] = ( - "vllm_ascend.core.scheduler.AscendScheduler") - scheduler_config["enable_pd_transfer"] = False - scheduler_config["decode_max_num_seqs"] = 0 - # Override params in original SchedulerConfig with params in ascend_scheduler_config - for k, _ in scheduler_config.items(): - if hasattr(ascend_scheduler_config, k): - scheduler_config[k] = getattr(ascend_scheduler_config, k) - return cls(**scheduler_config) - - def __post_init__(self, *args) -> None: - self.max_num_encoder_input_tokens = self.max_num_batched_tokens - self.encoder_cache_size = self.max_num_batched_tokens - self.chunked_prefill_enabled = self.enable_chunked_prefill - if (self.max_num_batched_tokens < self.max_model_len - and not self.chunked_prefill_enabled): - raise ValueError( - "Ascend scheduler is enabled without chunked prefill feature. " - f"Argument max_num_batched_tokens ({self.max_num_batched_tokens}) is " - f"smaller than max_model_len ({self.max_model_len}). " - "This effectively limits the maximum sequence length to " - "max_num_batched_tokens and makes vLLM reject longer " - "sequences. Please increase max_num_batched_tokens or " - "decrease max_model_len.") - # concurrent partial prefills. Default is 1 meaning not enabled. - if self.max_long_partial_prefills is None: - self.max_long_partial_prefills = 1 - self.long_prefill_token_threshold = MAX_INT - - if self.long_prefill_token_threshold is None or \ - self.long_prefill_token_threshold <= 0: - if self.max_model_len is None: - self.long_prefill_token_threshold = MAX_INT - else: - self.long_prefill_token_threshold = \ - max(1, int(self.max_model_len * 0.04)) - - if self.max_long_partial_prefills < 0: - raise ValueError( - f"max_long_partial_prefills must be non-negative, but got " - f"{self.max_long_partial_prefills}") - if self.long_prefill_token_threshold < 0: - raise ValueError( - f"long_prefill_token_threshold must be non-negative, but got " - f"{self.long_prefill_token_threshold}") - - if self.policy != "fcfs": - raise NotImplementedError( - f"currently AscendScheduler only supports fcfs policy, got {self.policy}" - ) - if getattr(self, "scheduler_delay_factor", 0) > 0: - raise NotImplementedError( - "currently AscendScheduler doesn't support scheduler_delay_factor." - ) diff --git a/vllm_ascend/core/scheduler.py b/vllm_ascend/core/scheduler.py deleted file mode 100644 index 800536d1..00000000 --- a/vllm_ascend/core/scheduler.py +++ /dev/null @@ -1,592 +0,0 @@ -# -# Copyright (c) 2025 Huawei Technologies Co., Ltd. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# This file is a part of the vllm-ascend project. -# -import time -from collections import deque -from typing import Iterable, Optional, Union - -from vllm.config import VllmConfig -from vllm.distributed.kv_events import KVEventBatch -from vllm.logger import logger -from vllm.multimodal import MULTIMODAL_REGISTRY, MultiModalRegistry -from vllm.utils.math_utils import cdiv -from vllm.v1.core.kv_cache_manager import KVCacheBlocks -from vllm.v1.core.sched.output import NewRequestData, SchedulerOutput -from vllm.v1.core.sched.scheduler import Scheduler -from vllm.v1.engine import EngineCoreEventType, EngineCoreOutputs -from vllm.v1.kv_cache_interface import KVCacheConfig -from vllm.v1.outputs import ModelRunnerOutput -from vllm.v1.request import Request, RequestStatus -from vllm.v1.structured_output import StructuredOutputManager - - -class AscendScheduler(Scheduler): - """This Scheduler extends vllm's original v1 scheduler - with prefill-first scheduling strategy.""" - - def _initialize_common(self) -> None: - """Initialize common attributes shared across all versions.""" - self.scheduled_req_ids: set[str] = set() - self.running: list[Request] = [] - self.finished_prefill_reqs: deque[Request] = deque() - - enable_pd_transfer = getattr(self.scheduler_config, - 'enable_pd_transfer', False) - decode_max_num_seqs = getattr(self.scheduler_config, - 'decode_max_num_seqs', 0) - self.phase = "" if not enable_pd_transfer else "prefill" - self.decode_max_num_running_reqs = max(self.max_num_running_reqs, - decode_max_num_seqs) - - def __init__( - self, - vllm_config: VllmConfig, - kv_cache_config: KVCacheConfig, - structured_output_manager: StructuredOutputManager, - block_size: Optional[int] = None, - mm_registry: MultiModalRegistry = MULTIMODAL_REGISTRY, - include_finished_set: bool = False, - log_stats: bool = False, - ) -> None: - # Call the parent class's __init__ method - super().__init__(vllm_config, kv_cache_config, - structured_output_manager, block_size, mm_registry, - include_finished_set, log_stats) - - # Initialize common attributes - self._initialize_common() - - def schedule(self) -> SchedulerOutput: - if self.scheduler_config.chunked_prefill_enabled: - return super().schedule() - scheduled_new_reqs: list[Request] = [] - scheduled_resumed_reqs: list[Request] = [] - scheduled_running_reqs: list[Request] = [] - preempted_reqs: list[Request] = [] - - req_to_new_blocks: dict[str, KVCacheBlocks] = {} - num_scheduled_tokens: dict[str, int] = {} - token_budget = self.max_num_scheduled_tokens - - # Encoder-related. - scheduled_encoder_inputs: dict[str, list[int]] = {} - encoder_budget = self.max_num_encoder_input_tokens - - # Spec decode-related. - scheduled_spec_decode_tokens: dict[str, list[int]] = {} - - # For logging. - scheduled_timestamp = time.monotonic() - - # Record scheduled LoRA requests. - scheduled_loras: set[int] = set() - - # Use a temporary deque to collect requests that need to be skipped - # and put back at the head of the waiting queue later - skipped_waiting_requests: deque[Request] = deque() - - if self.phase == "prefill": - remaining_running_reqs = [] - for request in self.running: - # move request has finished prefill to finished_prefill_reqs - if request.num_tokens > request.num_prompt_tokens: - self.finished_prefill_reqs.append(request) - else: - remaining_running_reqs.append(request) - self.running = remaining_running_reqs - # all request prefilled, change phase to decode - if not self.waiting and not self.running: - self.phase = "decode" - # Skip long prompt requests in prefill stage. - # long_prefill_budget is float('inf') if not use. - if self.vllm_config.scheduler_config.long_prefill_token_threshold == 0: - long_prefill_budget = float('inf') - long_prefill_token_threshold = float('inf') - else: - long_prefill_budget = self.vllm_config.scheduler_config.max_long_partial_prefills - long_prefill_token_threshold = self.vllm_config.scheduler_config.long_prefill_token_threshold - - # Schedule prefill requests first. - while self.waiting and token_budget > 0: - if len(self.running) == (self.decode_max_num_running_reqs - if self.phase == "decode" else - self.max_num_running_reqs): - - break - - request = self.waiting[0] - - def skip_cur_request(): - self.waiting.popleft() - skipped_waiting_requests.appendleft(request) - - # P/D: skip request if still waiting for remote kvs. - if request.status == RequestStatus.WAITING_FOR_REMOTE_KVS: - is_ready = self._update_waiting_for_remote_kv(request) - if is_ready: - request.status = RequestStatus.WAITING - else: - skip_cur_request() - continue - - # Check that adding the request still respects the max_loras - # constraint. - if (self.lora_config and request.lora_request and - (len(scheduled_loras) == self.lora_config.max_loras - and request.lora_request.lora_int_id not in scheduled_loras)): - # Scheduling would exceed max_loras, skip. - skip_cur_request() - continue - - num_external_computed_tokens = 0 - load_kv_async = False - - # Get already-cached tokens. - if request.num_computed_tokens == 0: - new_computed_blocks, num_new_local_computed_tokens = \ - self.kv_cache_manager.get_computed_blocks( - request) - - # Get externally-cached tokens if using a KVConnector. - if self.connector is not None: - num_external_computed_tokens, load_kv_async = ( - self.connector.get_num_new_matched_tokens( - request, num_new_local_computed_tokens)) - - # Total computed tokens (local + external). - num_computed_tokens = (num_new_local_computed_tokens + - num_external_computed_tokens) - else: - # P/D: skip checking prefix cache if loaded from remote kvs. - new_computed_blocks = ( - self.kv_cache_manager.create_empty_block_list()) - num_new_local_computed_tokens = 0 - num_computed_tokens = request.num_computed_tokens - - encoder_inputs_to_schedule = None - new_encoder_budget = encoder_budget - - # P/D: loading remote KV, do not allocate for new work. - if load_kv_async: - assert num_external_computed_tokens > 0 - num_new_tokens = 0 - blocks = None - # Number of tokens to be scheduled. - else: - prompt_limit = self._get_prompt_limit(request) - # We use `request.num_tokens` instead of - # `request.num_prompt_tokens` to consider the resumed - # requests, which have output tokens. - num_new_tokens = request.num_tokens - num_computed_tokens - max_tokens_in_kvcache = (self.kv_cache_config.num_blocks * - self.block_size) - prompt_limit = min(prompt_limit, max_tokens_in_kvcache) - - # Finish request that exceeds prompt_limit or kv cache size. - if num_new_tokens > prompt_limit: - logger.warning( - "Input prompt (%d tokens) is too long" - " and exceeds limit of %d", - num_new_tokens, - prompt_limit, - ) - request.status = RequestStatus.FINISHED_IGNORED - self.finished_req_ids.add( # type: ignore - request.request_id) # type: ignore - self.waiting.popleft() - continue - - if num_new_tokens > token_budget: - # Scheduling would exceed token_budget, skip. - skip_cur_request() - continue - assert num_new_tokens > 0 - blocks = new_computed_blocks.blocks[0] - - # Schedule encoder inputs. - if request.has_encoder_inputs: - (encoder_inputs_to_schedule, num_new_tokens, - new_encoder_budget, - _) = self._try_schedule_encoder_inputs( - request, num_computed_tokens, num_new_tokens, - encoder_budget) - if num_new_tokens == 0 or len( - encoder_inputs_to_schedule) == 0: - # The request cannot be scheduled. - break - - watermark = getattr(self.scheduler_config, "watermark", 0.01) - if not self._check_watermark_for_prefill(request, num_new_tokens, - blocks, watermark): - # Scheduling would exceed watermark, skip. - skip_cur_request() - continue - - if num_new_tokens > long_prefill_token_threshold \ - and long_prefill_budget <= 0: - skip_cur_request() - continue - - new_blocks = self.kv_cache_manager.allocate_slots( - request, - num_new_tokens + num_external_computed_tokens, - num_new_local_computed_tokens, - new_computed_blocks=new_computed_blocks, - num_lookahead_tokens=self.num_lookahead_tokens, - delay_cache_blocks=load_kv_async) - if new_blocks is None: - # The request cannot be scheduled. - break - - # KVConnector: update internal state after allocation. - # This information is used to determine if a load is - # needed for this request. - if self.connector is not None: - self.connector.update_state_after_alloc( - request, - new_computed_blocks + new_blocks, - num_external_computed_tokens, - ) - - self.waiting.popleft() - if load_kv_async: - # If loading async, allocate memory and put request - # into the WAITING_FOR_REMOTE_KV state. - skipped_waiting_requests.appendleft(request) - request.status = RequestStatus.WAITING_FOR_REMOTE_KVS - continue - - self.running.append(request) - if self.log_stats: - request.record_event(EngineCoreEventType.SCHEDULED, - scheduled_timestamp) - self.scheduled_req_ids.add(request.request_id) - # Check request status. - if request.status == RequestStatus.WAITING: - scheduled_new_reqs.append(request) - elif request.status == RequestStatus.PREEMPTED: - scheduled_resumed_reqs.append(request) - else: - raise RuntimeError(f"Invalid request status: {request.status}") - - if self.lora_config and request.lora_request: - scheduled_loras.add(request.lora_request.lora_int_id) - - req_to_new_blocks[ - request.request_id] = self.kv_cache_manager.get_blocks( - request.request_id) - # Update request info. - num_scheduled_tokens[request.request_id] = num_new_tokens - token_budget -= num_new_tokens - if num_new_tokens > long_prefill_token_threshold: - long_prefill_budget -= 1 - request.status = RequestStatus.RUNNING - request.num_computed_tokens = num_computed_tokens - # Count the number of prefix cached tokens. - if request.num_cached_tokens < 0: - request.num_cached_tokens = num_computed_tokens - - # Encoder-related. - if encoder_inputs_to_schedule: - scheduled_encoder_inputs[request.request_id] = ( - encoder_inputs_to_schedule) - # Allocate the encoder cache. - for i in encoder_inputs_to_schedule: - self.encoder_cache_manager.allocate(request, i) - encoder_budget = new_encoder_budget - - # Put back any skipped requests at the head of the waiting queue - if skipped_waiting_requests: - self.waiting.extendleft(skipped_waiting_requests) - - if self.phase == "decode": - while len( - self.running - ) < self.decode_max_num_running_reqs and self.finished_prefill_reqs: - request = self.finished_prefill_reqs.popleft() - self.running.append(request) - - # If no prefill requests are scheduled, - # Schedule decode requests next. - if len(self.scheduled_req_ids) == 0: - req_index = 0 - while req_index < len(self.running) and token_budget > 0: - request = self.running[req_index] - if request.request_id in self.scheduled_req_ids: - # This request has already been scheduled. - req_index += 1 - continue - - num_new_tokens = (request.num_tokens_with_spec - - request.num_computed_tokens) - assert (request.num_tokens - request.num_computed_tokens) == 1 - num_new_tokens = min(num_new_tokens, token_budget) - # Make sure the input position does not exceed the max model len. - # This is necessary when using spec decoding. - num_new_tokens = min( - num_new_tokens, - self.max_model_len - request.num_computed_tokens) - - # Schedule encoder inputs. - encoder_inputs_to_schedule = None - new_encoder_budget = encoder_budget - if request.has_encoder_inputs: - (encoder_inputs_to_schedule, num_new_tokens, - new_encoder_budget) = self._try_schedule_encoder_inputs( - request, request.num_computed_tokens, num_new_tokens, - encoder_budget) - - # Check that adding the request still respects the max_loras - # constraint. - if self.lora_config and request.lora_request and ( - len(scheduled_loras) == self.lora_config.max_loras - and request.lora_request.lora_int_id - not in scheduled_loras): - # Scheduling would exceed max_loras, skip. - num_new_tokens = 0 - - if num_new_tokens == 0: - # The request cannot be scheduled because one of the following - # reason: - # 1. No new tokens to schedule. This may happen when PP>1 and - # we have already scheduled all prompt tokens but they are - # not finished yet. - # 2. Adding the request exceeds the max_loras constraint. - # NOTE(woosuk): Here, by doing `continue` instead of `break`, - # we do not strictly follow the FCFS scheduling policy and - # allow the lower-priority requests to be scheduled. - req_index += 1 - continue - - while True: - new_blocks = self.kv_cache_manager.allocate_slots( - request, - num_new_tokens, - num_lookahead_tokens=self.num_lookahead_tokens) - if new_blocks is None: - # The request cannot be scheduled. - # Preempt the lowest-priority request. - preempted_req = self.running.pop() - self.kv_cache_manager.free(preempted_req) - preempted_req.status = RequestStatus.PREEMPTED - preempted_req.num_computed_tokens = 0 - if self.log_stats: - preempted_req.record_event( - EngineCoreEventType.PREEMPTED, - scheduled_timestamp) - self.waiting.appendleft(preempted_req) - preempted_reqs.append(preempted_req) - if preempted_req == request: - # No more request to preempt. - can_schedule = False - break - else: - # The request can be scheduled. - can_schedule = True - break - if not can_schedule: - break - assert new_blocks is not None - - # Schedule the request. - scheduled_running_reqs.append(request) - self.scheduled_req_ids.add(request.request_id) - req_to_new_blocks[request.request_id] = new_blocks - num_scheduled_tokens[request.request_id] = num_new_tokens - token_budget -= num_new_tokens - req_index += 1 - - # Speculative decode related. - if request.spec_token_ids: - num_scheduled_spec_tokens = (num_new_tokens + - request.num_computed_tokens - - request.num_tokens) - if num_scheduled_spec_tokens > 0: - # Trim spec_token_ids list to num_scheduled_spec_tokens. - del request.spec_token_ids[num_scheduled_spec_tokens:] - scheduled_spec_decode_tokens[request.request_id] = ( - request.spec_token_ids) - - # Encoder-related. - if encoder_inputs_to_schedule: - scheduled_encoder_inputs[request.request_id] = ( - encoder_inputs_to_schedule) - # Allocate the encoder cache. - for i in encoder_inputs_to_schedule: - self.encoder_cache_manager.allocate(request, i) - encoder_budget = new_encoder_budget - - # Record scheduled LoRA requests. - if self.lora_config and request.lora_request: - scheduled_loras.add(request.lora_request.lora_int_id) - - # Check if the scheduling constraints are satisfied. - total_num_scheduled_tokens = sum(num_scheduled_tokens.values()) - assert total_num_scheduled_tokens <= self.max_num_scheduled_tokens - assert token_budget >= 0 - assert len( - self.running - ) <= self.decode_max_num_running_reqs if self.phase == "decode" else self.max_num_running_reqs - assert len(scheduled_new_reqs) + len(scheduled_resumed_reqs) + len( - scheduled_running_reqs) <= len(self.running) - - # Get the longest common prefix among all requests in the running queue. - # This can be potentially used for cascade attention. - num_common_prefix_blocks = [0] * len( - self.kv_cache_config.kv_cache_groups) - if self.running: - any_request = self.running[0] - num_common_prefix_blocks = ( - self.kv_cache_manager.get_num_common_prefix_blocks( - any_request.request_id)) - - # Construct the scheduler output. - new_reqs_data = [ - NewRequestData.from_request( - req, req_to_new_blocks[req.request_id].get_block_ids()) - for req in scheduled_new_reqs - ] - - cached_reqs_data = self._make_cached_request_data( - scheduled_running_reqs, scheduled_resumed_reqs, - num_scheduled_tokens, scheduled_spec_decode_tokens, - req_to_new_blocks) - scheduled_cached_reqs = cached_reqs_data - scheduler_output = SchedulerOutput( - scheduled_new_reqs=new_reqs_data, - scheduled_cached_reqs=scheduled_cached_reqs, - num_scheduled_tokens=num_scheduled_tokens, - total_num_scheduled_tokens=total_num_scheduled_tokens, - scheduled_spec_decode_tokens=scheduled_spec_decode_tokens, - scheduled_encoder_inputs=scheduled_encoder_inputs, - num_common_prefix_blocks=num_common_prefix_blocks, - # finished_req_ids is an existing state in the scheduler, - # instead of being newly scheduled in this step. - # It contains the request IDs that are finished in between - # the previous and the current steps. - finished_req_ids=self.finished_req_ids, # type: ignore - free_encoder_mm_hashes=self.encoder_cache_manager. - get_freed_mm_hashes(), - ) - # NOTE(Kuntai): this function is designed for multiple purposes: - # 1. Plan the KV cache store - # 2. Wrap up all the KV cache load / save ops into an opaque object - # 3. Clear the internal states of the connector - if self.connector is not None: - meta = self.connector.build_connector_meta(scheduler_output) - scheduler_output.kv_connector_metadata = meta - - events = self.kv_cache_manager.take_events() - if events: - batch = KVEventBatch(ts=time.time(), events=events) - self.kv_event_publisher.publish(batch) - - # Advance the number of computed tokens for the request AFTER - # the request is scheduled. - # 1. The scheduler_output of the current step has to include the - # original number of scheduled tokens to determine input IDs. - # 2. Advance the number of computed tokens here allowing us to - # schedule the prefill request again immediately in the next - # scheduling step. - # 3. If some tokens (e.g. spec tokens) are rejected later, the number of - # computed tokens will be adjusted in update_from_output. - for req_id, num_scheduled_token in num_scheduled_tokens.items(): - self.requests[req_id].num_computed_tokens += num_scheduled_token - - self.finished_req_ids = set() # type: ignore - return scheduler_output - - def _check_watermark_for_prefill(self, - request, - num_new_tokens, - computed_blocks, - watermark=0.01): - computed_blocks = computed_blocks or [] - watermark_blocks = self.kv_cache_config.num_blocks * watermark - num_computed_tokens = (request.num_computed_tokens + - len(computed_blocks) * self.block_size) - num_required_blocks = cdiv(num_new_tokens + num_computed_tokens, - self.block_size) - req_blocks = self.kv_cache_manager.coordinator.get_blocks( - request.request_id) - num_new_blocks = (num_required_blocks - len(req_blocks[0]) - - len(computed_blocks)) - num_evictable_computed_blocks = sum(1 for blk in computed_blocks - if blk.ref_cnt == 0) - # If number of free blocks is less than water mark after allocating, don't allocate. - if (self.kv_cache_manager.block_pool.get_num_free_blocks() - - num_evictable_computed_blocks - - num_new_blocks) < watermark_blocks: - return False - return True - - def _get_prompt_limit(self, request: Request) -> int: - if (self.scheduler_config.chunked_prefill_enabled - and not self.scheduler_config.is_multi_step): - prompt_limit = self.vllm_config.model_config.max_model_len - else: - prompt_limit = min( - self.vllm_config.model_config.max_model_len, - self.scheduler_config.max_num_batched_tokens, - ) - - # Model is fine tuned with long context. Return the fine tuned max_len. - if request.lora_request and request.lora_request.long_lora_max_len: - assert prompt_limit <= request.lora_request.long_lora_max_len - return request.lora_request.long_lora_max_len - else: - return prompt_limit - - def finish_requests( - self, - request_ids: Union[str, Iterable[str]], - finished_status: RequestStatus, - ) -> None: - """Handles the finish signal from outside the scheduler. - - For example, the API server can abort a request when the client - disconnects. - """ - for req_id in request_ids: - request = self.requests.get(req_id) - if request is None: - # Invalid request ID. - continue - if request.status == RequestStatus.RUNNING: - self.scheduled_req_ids.discard(request.request_id) - super().finish_requests(request_ids, finished_status) - - def update_from_output( - self, - scheduler_output: SchedulerOutput, - model_runner_output: ModelRunnerOutput, - ) -> EngineCoreOutputs: - num_scheduled_tokens = scheduler_output.num_scheduled_tokens - - # NOTE(woosuk): As len(self.running) can be up to 1K or more, the below - # loop can be a performance bottleneck. We should do our best to avoid - # expensive operations inside the loop. - for request in self.running: - req_id = request.request_id - num_tokens_scheduled = num_scheduled_tokens.get(req_id, 0) - if num_tokens_scheduled == 0: - # The request was not scheduled in this step. - continue - if req_id in self.scheduled_req_ids: - self.scheduled_req_ids.remove(req_id) - - return super().update_from_output(scheduler_output, - model_runner_output) diff --git a/vllm_ascend/platform.py b/vllm_ascend/platform.py index 7cc84fc6..c7bbe390 100644 --- a/vllm_ascend/platform.py +++ b/vllm_ascend/platform.py @@ -153,7 +153,6 @@ class NPUPlatform(Platform): model_config = vllm_config.model_config parallel_config = vllm_config.parallel_config cache_config = vllm_config.cache_config - ascend_scheduler_config = ascend_config.ascend_scheduler_config kv_cache_dtype = vllm_config.additional_config.get( "kv_cache_dtype", None) @@ -291,35 +290,23 @@ class NPUPlatform(Platform): if cache_config: if cache_config.block_size is None: cache_config.block_size = 128 - - if cache_config.enable_prefix_caching or \ - not ascend_scheduler_config.enabled or \ - getattr(ascend_scheduler_config, "enable_chunked_prefill", False): - logger.warning( - "If chunked prefill or prefix caching is enabled, block size must be set to 128." - ) - origin_block_size = cache_config.block_size - cache_config.block_size = 128 - # TODO(MengqingCao): Remove the model_type check, after resolving the hidden error in get_kv_cache_groups. - if model_config and model_config.hf_config.model_type == "qwen3_next": - logger.warning( - "When running qwen3-next model, block_size needs to be restored to its original value." - ) - cache_config.block_size = origin_block_size + # ignore block size check if model is qwen3-next + # TODO(MengqingCao): Remove the model_type check, after resolving the hidden error in get_kv_cache_groups. + if not (model_config + and model_config.hf_config.model_type == "qwen3_next"): + # we must set block size to 128 if prefix caching is enabled or chunked prefill is enabled + if cache_config.enable_prefix_caching or \ + (vllm_config.scheduler_config and vllm_config.scheduler_config.enable_chunked_prefill): + if cache_config.block_size != 128: + logger.warning( + "block size must be set to 128 on NPU platform.") + cache_config.block_size = 128 # Activate custom ops for v1, except on 310P if get_ascend_device_type() != AscendDeviceType._310P: compilation_config.custom_ops = ["all"] - # If ascend_scheduler_config is enabled, - # extents original scheduler_config to use AscendScheduler. - if ascend_config.ascend_scheduler_config.enabled: - from vllm_ascend.core.schedule_config import AscendSchedulerConfig - ascend_scheduler_config = AscendSchedulerConfig.initialize_from_config( - vllm_config.scheduler_config, - ascend_config.ascend_scheduler_config) - vllm_config.scheduler_config = ascend_scheduler_config - elif ascend_config.recompute_scheduler_enable: + if ascend_config.recompute_scheduler_enable: from vllm_ascend.core.recompute_schedule_config import \ RecomputeSchedulerConfig recompute_scheduler_config = RecomputeSchedulerConfig.initialize_from_config( diff --git a/vllm_ascend/profiling_config.py b/vllm_ascend/profiling_config.py index b6825933..8e0dfadf 100644 --- a/vllm_ascend/profiling_config.py +++ b/vllm_ascend/profiling_config.py @@ -44,11 +44,6 @@ SERVICE_PROFILING_SYMBOLS_YAML = """ handler: msserviceprofiler.vllm_profiler.vllm_v1.batch_hookers:schedule name: batchFrameworkProcessing -- symbol: vllm_ascend.core.scheduler:AscendScheduler.schedule - min_version: "0.9.1" - handler: msserviceprofiler.vllm_profiler.vllm_v1.batch_hookers:schedule - name: batchFrameworkProcessing - - symbol: vllm.v1.core.sched.scheduler:Scheduler._free_request min_version: "0.9.1" handler: msserviceprofiler.vllm_profiler.vllm_v1.batch_hookers:free_request diff --git a/vllm_ascend/torchair/torchair_attention.py b/vllm_ascend/torchair/torchair_attention.py index 16fcb385..086a4dff 100644 --- a/vllm_ascend/torchair/torchair_attention.py +++ b/vllm_ascend/torchair/torchair_attention.py @@ -451,8 +451,7 @@ class AscendAttentionTorchairBackendImpl(AttentionImpl): else: raise NotImplementedError( "Torchair graph mode with non-MLA attention backend is still experimental." - "v1 scheduler(chunked prefill) is not supported at this moment. Please" - "setting 'ascend_scheduler_config':{'enabled':true} in additional_config" - "to use ascend scheduler.") + "v1 scheduler(chunked prefill) is not supported at this moment. " + ) return output.view(num_tokens, self.hidden_size) diff --git a/vllm_ascend/worker/model_runner_v1.py b/vllm_ascend/worker/model_runner_v1.py index 2e7c4ea2..40419265 100644 --- a/vllm_ascend/worker/model_runner_v1.py +++ b/vllm_ascend/worker/model_runner_v1.py @@ -330,10 +330,6 @@ class NPUModelRunner(LoRAModelRunnerMixin): # Ascend-specific configurations self.ascend_config = get_ascend_config() - if self.ascend_config.ascend_scheduler_config.enabled: - self.chunked_prefill_enabled = self.scheduler_config.chunked_prefill_enabled - else: - self.chunked_prefill_enabled = True self.weight_prefetch_method = WeightPrefetchMethod( self.ascend_config.weight_prefetch_config) # Dump / PrecisionDebugger configuration now comes from AscendConfig @@ -1942,7 +1938,6 @@ class NPUModelRunner(LoRAModelRunnerMixin): def _build_attn_state(self, num_reqs, num_scheduled_tokens, num_valid_tokens): - ascend_config = get_ascend_config() if np.array_equal(self.seq_lens_np[:num_reqs], num_scheduled_tokens): attn_state = AscendAttentionState.PrefillNoCache # We assume it is the decode stage, where prefill occurs but only one token is not hit in cache. @@ -1959,7 +1954,7 @@ class NPUModelRunner(LoRAModelRunnerMixin): else: attn_state = AscendAttentionState.ChunkedPrefill # splitfuse - elif not ascend_config.ascend_scheduler_config.enabled or self.chunked_prefill_enabled: + elif self.scheduler_config.enable_chunked_prefill: attn_state = AscendAttentionState.ChunkedPrefill else: attn_state = AscendAttentionState.PrefillCacheHit