From da84eb2f4072f0e619ee5544f49dd0cf20b2c4b1 Mon Sep 17 00:00:00 2001 From: wangxiyuan Date: Thu, 4 Dec 2025 14:10:28 +0800 Subject: [PATCH] Remove ascend schuduler ut (#4684) ### What this PR does / why we need it? 1. Remove ascend schuduler ut 2. Remove models ut 3. move mla to ops 4. skip the failed ut - vLLM version: v0.12.0 Signed-off-by: wangxiyuan --- .../workflows/vllm_ascend_test_pr_light.yaml | 10 +- tests/ut/core/test_schedule_config.py | 134 ---- ...ler.py => test_scheduler_dynamic_batch.py} | 710 ------------------ tests/ut/models/__init__.py | 0 tests/ut/models/conftest.py | 100 --- tests/ut/{models => ops}/test_mla.py | 0 6 files changed, 6 insertions(+), 948 deletions(-) delete mode 100644 tests/ut/core/test_schedule_config.py rename tests/ut/core/{test_scheduler.py => test_scheduler_dynamic_batch.py} (50%) delete mode 100644 tests/ut/models/__init__.py delete mode 100644 tests/ut/models/conftest.py rename tests/ut/{models => ops}/test_mla.py (100%) diff --git a/.github/workflows/vllm_ascend_test_pr_light.yaml b/.github/workflows/vllm_ascend_test_pr_light.yaml index 5a84bef2..fdb765ed 100644 --- a/.github/workflows/vllm_ascend_test_pr_light.yaml +++ b/.github/workflows/vllm_ascend_test_pr_light.yaml @@ -135,10 +135,12 @@ jobs: pytest -sv --cov --cov-report=xml:unittests-coverage.xml tests/ut \ --ignore tests/ut/torchair/models/test_torchair_deepseek_mtp.py \ --ignore tests/ut/torchair/models/test_torchair_deepseek_v2.py \ - --ignore tests/ut/models/test_qwen2_vl.py \ - --ignore tests/ut/models/test_qwen2_5_vl.py \ - --ignore tests/ut/models/test_qwen2_5_vl_without_padding.py \ - --ignore tests/ut/model_loder + --ignore tests/ut/model_loader/netloader/test_netloader_elastic.py \ + --ignore tests/ut/kv_connector/test_remote_prefill_lifecycle.py \ + --ignore tests/ut/kv_connector/test_remote_decode_lifecycle.py \ + --ignore tests/ut/kv_connector/test_llmdatadist_connector.py \ + --ignore tests/ut/ops/test_linear.py \ + --ignore tests/ut/core/test_scheduler_dynamic_batch.py - name: Upload coverage to Codecov # only upload coverage when commits merged diff --git a/tests/ut/core/test_schedule_config.py b/tests/ut/core/test_schedule_config.py deleted file mode 100644 index 032a1a87..00000000 --- a/tests/ut/core/test_schedule_config.py +++ /dev/null @@ -1,134 +0,0 @@ -# -# Copyright (c) 2025 Huawei Technologies Co., Ltd. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from vllm.config import SchedulerConfig - -from tests.ut.base import TestBase -from vllm_ascend.core.schedule_config import AscendSchedulerConfig - - -class TestAscendSchedulerConfig(TestBase): - - def setUp(self): - self.basic_scheduler_config = SchedulerConfig( - max_num_batched_tokens=8192, - max_model_len=8192, - is_multimodal_model=False, - send_delta_data=False, - ) - - def test_initialize_from_config_with_default(self): - # No additional config given, check the default value here. - ascend_config = AscendSchedulerConfig.initialize_from_config( - self.basic_scheduler_config, {}) - self.assertEqual(ascend_config.enable_chunked_prefill, False) - self.assertEqual(ascend_config.policy, "fcfs") - self.assertEqual(ascend_config.scheduler_cls, - "vllm_ascend.core.scheduler.AscendScheduler") - self.assertEqual(ascend_config.max_num_encoder_input_tokens, 8192) - self.assertEqual(ascend_config.encoder_cache_size, 8192) - - def test_initialize_from_config_with_override(self): - # test override - ascend_config = AscendSchedulerConfig.initialize_from_config( - self.basic_scheduler_config, - AscendSchedulerConfig( - enable_chunked_prefill=False, - policy="fcfs", - scheduler_cls="vllm_ascend.core.scheduler.AscendScheduler", - max_num_batched_tokens=8192, - max_model_len=2048, - max_long_partial_prefills=1, - long_prefill_token_threshold=512, - ), - ) - self.assertEqual(ascend_config.enable_chunked_prefill, False) - self.assertEqual(ascend_config.policy, "fcfs") - self.assertEqual(ascend_config.scheduler_cls, - "vllm_ascend.core.scheduler.AscendScheduler") - self.assertEqual(ascend_config.max_num_batched_tokens, 8192) - self.assertEqual(ascend_config.encoder_cache_size, 8192) - self.assertEqual(ascend_config.max_long_partial_prefills, 1) - self.assertEqual(ascend_config.long_prefill_token_threshold, 512) - - def test_not_implemented_policy(self): - with self.assertRaises(NotImplementedError) as context: - AscendSchedulerConfig.initialize_from_config( - self.basic_scheduler_config, - AscendSchedulerConfig( - policy="custom_policy", - max_num_batched_tokens=8192, - max_model_len=2048, - ), - ) - self.assertIn( - "currently AscendScheduler only supports fcfs policy", - str(context.exception), - ) - - def test_no_override(self): - ascend_config = AscendSchedulerConfig.initialize_from_config( - self.basic_scheduler_config, {}) - self.assertEqual(ascend_config.max_num_encoder_input_tokens, 8192) - self.assertEqual(ascend_config.encoder_cache_size, 8192) - - def test_valid_config_with_multimodal(self): - config = AscendSchedulerConfig.initialize_from_config( - SchedulerConfig(is_multimodal_model=True, - max_num_batched_tokens=8192), {}) - self.assertTrue(config.is_multimodal_model) - - def test_valid_config_with_chunked_prefill(self): - ascend_config = AscendSchedulerConfig.initialize_from_config( - self.basic_scheduler_config, - AscendSchedulerConfig( - enable_chunked_prefill=True, - max_num_batched_tokens=8192, - max_model_len=8192, - ), - ) - self.assertEqual(ascend_config.max_num_batched_tokens, 8192) - self.assertEqual(ascend_config.max_model_len, 8192) - self.assertTrue(ascend_config.enable_chunked_prefill) - - def test_invalid_config_without_chunked_prefill(self): - with self.assertRaises(ValueError) as context: - AscendSchedulerConfig.initialize_from_config( - self.basic_scheduler_config, - AscendSchedulerConfig( - enable_chunked_prefill=False, - max_num_batched_tokens=2048, - max_model_len=8192, - ), - ) - self.assertIn( - "Ascend scheduler is enabled without chunked prefill feature", - str(context.exception), - ) - self.assertIn("max_num_batched_tokens (2048)", str(context.exception)) - self.assertIn("max_model_len (8192)", str(context.exception)) - - def test_initialize_from_config_with_pd_transfer(self): - ascend_config = AscendSchedulerConfig.initialize_from_config( - self.basic_scheduler_config, - AscendSchedulerConfig( - enable_pd_transfer=True, - decode_max_num_seqs=48, - max_num_batched_tokens=8192, - max_model_len=4096, - ), - ) - self.assertEqual(ascend_config.enable_pd_transfer, True) - self.assertEqual(ascend_config.decode_max_num_seqs, 48) diff --git a/tests/ut/core/test_scheduler.py b/tests/ut/core/test_scheduler_dynamic_batch.py similarity index 50% rename from tests/ut/core/test_scheduler.py rename to tests/ut/core/test_scheduler_dynamic_batch.py index a24037b4..8f38c18f 100644 --- a/tests/ut/core/test_scheduler.py +++ b/tests/ut/core/test_scheduler_dynamic_batch.py @@ -3,7 +3,6 @@ from typing import Any, Dict, List, Optional, Tuple from unittest.mock import MagicMock, patch -import pytest import torch from vllm.config import (CacheConfig, KVTransferConfig, ModelConfig, SchedulerConfig, SpeculativeConfig, VllmConfig) @@ -21,7 +20,6 @@ from vllm.v1.request import Request, RequestStatus from vllm.v1.structured_output import StructuredOutputManager from tests.ut.base import TestBase -from vllm_ascend.core.scheduler import AscendScheduler from vllm_ascend.core.scheduler_dynamic_batch import SchedulerDynamicBatch EOS_TOKEN_ID = 50256 @@ -96,714 +94,6 @@ def make_output(scheduler): return modelrunner_output -@pytest.mark.skip("Ascend Scheduler has been deprecated") -class TestAscendScheduler(TestBase): - - @patch("vllm.config.ModelConfig.__post_init__", MagicMock()) - @patch("vllm.config.VllmConfig.__post_init__", MagicMock()) - @patch('vllm.v1.core.sched.scheduler.compute_encoder_budget') - def create_scheduler(self, mock_compute_encoder_budget): - mock_compute_encoder_budget.return_value = [100, 100] - use_kv_connector = False - block_size = 16 - - scheduler_config = SchedulerConfig( - max_num_seqs=16, - max_model_len=MAX_NUM_BATCHED_TOKENS, - long_prefill_token_threshold=LONG_PREFILL_TOKEN_THRESHOLD, - disable_chunked_mm_input=False, - enable_chunked_prefill=ENABLE_CHUNKED_PREFILL, - max_num_batched_tokens=MAX_NUM_BATCHED_TOKENS, - ) - - scheduler_config.max_num_encoder_input_tokens = 10000 - scheduler_config.encoder_cache_size = 10000 - scheduler_config.chunked_prefill_enabled = False - - model_config = ModelConfig( - model=MODEL, - task="auto", - tokenizer=MODEL, - tokenizer_mode="auto", - trust_remote_code=True, - dtype="float16", - seed=42, - max_model_len=MAX_NUM_BATCHED_TOKENS, - ) - model_config.pooler_config = MagicMock() - model_config.multimodal_config = MagicMock() - model_config.hf_config = MagicMock() - model_config.hf_config.is_encoder_decoder = False - # Cache config, optionally force APC - kwargs_cache: Dict[str, - Any] = ({} if ENABLE_PREFIX_CACHING is None else { - 'enable_prefix_caching': - ENABLE_PREFIX_CACHING - }) - cache_config = CacheConfig( - block_size=block_size, - gpu_memory_utilization=0.9, - swap_space=0, - cache_dtype="auto", - **kwargs_cache, - ) - - kv_transfer_config = KVTransferConfig( - kv_connector="SharedStorageConnector", - kv_role="kv_both", - kv_connector_extra_config={"shared_storage_path": "local_storage"}, - ) if use_kv_connector else None - - speculative_config: Optional[SpeculativeConfig] = None - if NUM_SPECULATIVE_TOKENS is not None: - speculative_config = SpeculativeConfig( - model="ngram", num_speculative_tokens=NUM_SPECULATIVE_TOKENS) - - vllm_config = VllmConfig( - scheduler_config=scheduler_config, - model_config=model_config, - cache_config=cache_config, - kv_transfer_config=kv_transfer_config, - speculative_config=speculative_config, - ) - - kv_cache_config = KVCacheConfig( - num_blocks=10000, # A large number of blocks to hold all requests - kv_cache_tensors=[], - kv_cache_groups=[ - KVCacheGroupSpec(['layer'], - FullAttentionSpec(block_size, 1, 1, - torch.float32, False, - False)) - ], - ) - cache_config.num_gpu_blocks = 10000 - - scheduler = AscendScheduler( - vllm_config=vllm_config, - kv_cache_config=kv_cache_config, - log_stats=True, - block_size=block_size, - structured_output_manager=MagicMock(spec=StructuredOutputManager), - ) - - should_advance = MagicMock() - should_advance.return_value = False - scheduler.structured_output_manager.should_advance = should_advance - - return scheduler - - def test_add_requests(self): - scheduler = self.create_scheduler() - requests = create_requests(num_requests=10) - - for i, request in enumerate(requests): - scheduler.add_request(request) - self.assertIn(request.request_id, scheduler.requests) - self.assertEqual(len(scheduler.waiting), i + 1) - - def test_finish_request(self): - scheduler = self.create_scheduler() - requests = create_requests(num_requests=10) - for request in requests: - scheduler.add_request(request) - - for i, request in enumerate(requests): - scheduler.finish_requests(request.request_id, - RequestStatus.FINISHED_ABORTED) - self.assertNotIn(request.request_id, scheduler.requests) - self.assertEqual(len(scheduler.waiting), 9 - i) - - def test_get_num_unfinished_requests(self): - scheduler = self.create_scheduler() - requests = create_requests(num_requests=10) - for request in requests: - scheduler.add_request(request) - - for i, request in enumerate(requests): - scheduler.finish_requests(request.request_id, - RequestStatus.FINISHED_STOPPED) - self.assertEqual(scheduler.get_num_unfinished_requests(), - len(requests) - i - 1) - - def test_schedule(self): - '''Test scheduling. - Two cases: default APC/no prompt logprobs; APC=True + prompt logprobs - ''' - scheduler = self.create_scheduler() - scheduler.scheduler_config.chunked_prefill_enabled = False - requests = create_requests(num_requests=10) - for request in requests: - scheduler.add_request(request) - - # Test initial scheduling - output = scheduler.schedule() - self.assertEqual(len(output.scheduled_new_reqs), len(requests)) - self.assertEqual(output.scheduled_cached_reqs.num_reqs, 0) - self.assertEqual(len(output.finished_req_ids), 0) - # Verify all requests are scheduled. - for req_id, num_tokens in output.num_scheduled_tokens.items(): - self.assertEqual(num_tokens, - len(requests[int(req_id)].prompt_token_ids)) - - # Verify requests moved from waiting to running - self.assertEqual(len(scheduler.waiting), 0) - self.assertEqual(len(scheduler.running), len(requests)) - for i, request in enumerate(requests): - self.assertEqual(scheduler.running[i], request) - - def test_schedule_multimodal_requests(self): - scheduler = self.create_scheduler() - scheduler.scheduler_config.chunked_prefill_enabled = False - mm_positions = [[PlaceholderRange(offset=i, length=10)] - for i in range(10)] - requests = create_requests( - num_requests=10, - mm_positions=mm_positions, - ) - for request in requests: - scheduler.add_request(request) - - output = scheduler.schedule() - self.assertEqual(len(output.scheduled_new_reqs), len(requests)) - self.assertEqual(output.scheduled_cached_reqs.num_reqs, 0) - self.assertEqual(len(output.finished_req_ids), 0) - for req_id, num_tokens in output.num_scheduled_tokens.items(): - assert num_tokens == len(requests[int(req_id)].prompt_token_ids) - - # Verify all requests are scheduled. - for req_id, num_tokens in output.num_scheduled_tokens.items(): - self.assertEqual(num_tokens, - len(requests[int(req_id)].prompt_token_ids)) - self.assertEqual(len(output.scheduled_encoder_inputs), len(requests)) - for req_id, encoder_input in output.scheduled_encoder_inputs.items(): - assert len(encoder_input) == 1 - - # Verify requests moved from waiting to running - self.assertEqual(len(scheduler.waiting), 0) - self.assertEqual(len(scheduler.running), len(requests)) - for i, request in enumerate(requests): - self.assertEqual(scheduler.running[i], request) - - def test_concurrent_partial_prefills_schedule(self): - '''Test concurrent partial prefills scheduling. - total requests = 10, every request has 10 token. - while set long_prefill_token_threshold = 1, scheduler can - only schedule max_long_partial_prefills long request. - ''' - scheduler = self.create_scheduler() - scheduler.scheduler_config.chunked_prefill_enabled = False - scheduler.scheduler_config.max_long_partial_prefills = 2 - scheduler.scheduler_config.long_prefill_token_threshold = 1 - requests = create_requests(num_requests=10, num_tokens=20) - for request in requests: - scheduler.add_request(request) - - # Test initial scheduling - output = scheduler.schedule() - self.assertEqual(len(output.scheduled_new_reqs), - scheduler.scheduler_config.max_long_partial_prefills) - self.assertEqual(output.scheduled_cached_reqs.num_reqs, 0) - self.assertEqual(len(output.finished_req_ids), 0) - - def test_schedule_enable_prefix_caching(self): - '''Test scheduling. - Two cases: default APC/no prompt logprobs; APC=True + prompt logprobs - ''' - global ENABLE_PREFIX_CACHING - ENABLE_PREFIX_CACHING = True - global PROMPT_LOGPROBS - PROMPT_LOGPROBS = 5 - scheduler = self.create_scheduler() - scheduler.scheduler_config.chunked_prefill_enabled = False - requests = create_requests(num_requests=10) - for request in requests: - scheduler.add_request(request) - - # Test initial scheduling - output = scheduler.schedule() - self.assertEqual(len(output.scheduled_new_reqs), len(requests)) - self.assertEqual(output.scheduled_cached_reqs.num_reqs, 0) - self.assertEqual(len(output.finished_req_ids), 0) - # Verify all requests are scheduled. - for req_id, num_tokens in output.num_scheduled_tokens.items(): - self.assertEqual(num_tokens, - len(requests[int(req_id)].prompt_token_ids)) - - # Verify requests moved from waiting to running - self.assertEqual(len(scheduler.waiting), 0) - self.assertEqual(len(scheduler.running), len(requests)) - for i, request in enumerate(requests): - self.assertEqual(scheduler.running[i], request) - - def test_stop_via_update_from_output(self): - """Test stopping behavior through update_from_output""" - global NUM_SPECULATIVE_TOKENS - NUM_SPECULATIVE_TOKENS = 1 - scheduler = self.create_scheduler() - - # Test case 1: Stop on EOS token - requests = create_requests(num_requests=2, max_tokens=10) - for req in requests: - req.num_computed_tokens = req.num_tokens - scheduler.requests[req.request_id] = req - scheduler.running.append(req) - req.status = RequestStatus.RUNNING - - scheduler_output = SchedulerOutput(scheduled_new_reqs=[], - scheduled_cached_reqs=[], - num_scheduled_tokens={ - requests[0].request_id: 1, - requests[1].request_id: 2 - }, - total_num_scheduled_tokens=3, - scheduled_encoder_inputs={}, - scheduled_spec_decode_tokens={ - requests[0].request_id: [], - requests[1].request_id: [10] - }, - num_common_prefix_blocks=0, - finished_req_ids=set(), - free_encoder_mm_hashes=[]) - model_output = ModelRunnerOutput( - req_ids=[req.request_id for req in requests], - req_id_to_index={ - req.request_id: i - for i, req in enumerate(requests) - }, - sampled_token_ids=[[EOS_TOKEN_ID], [10, 11] - ], # First request hits EOS, second continues - logprobs=None, - prompt_logprobs_dict={}, - pooler_output=[]) - - scheduler.update_from_output(scheduler_output, model_output) - - # Verify first request stopped, second continues - self.assertEqual(len(scheduler.running), 1) - self.assertEqual(scheduler.running[0].request_id, - requests[1].request_id) - self.assertEqual(requests[0].status, RequestStatus.FINISHED_STOPPED) - self.assertIn(requests[0].request_id, scheduler.finished_req_ids) - self.assertEqual(list(requests[0].output_token_ids), [EOS_TOKEN_ID]) - self.assertEqual(list(requests[1].output_token_ids), [10, 11]) - - # Test case 2: Stop on custom stop token - NUM_SPECULATIVE_TOKENS = 2 - scheduler = self.create_scheduler() - requests = create_requests(num_requests=2, - max_tokens=10, - stop_token_ids=[42, 43]) - for req in requests: - req.num_computed_tokens = req.num_tokens - scheduler.requests[req.request_id] = req - scheduler.running.append(req) - req.status = RequestStatus.RUNNING - - scheduler_output = SchedulerOutput(scheduled_new_reqs=[], - scheduled_cached_reqs=[], - num_scheduled_tokens={ - requests[0].request_id: 3, - requests[1].request_id: 2 - }, - total_num_scheduled_tokens=5, - scheduled_encoder_inputs={}, - scheduled_spec_decode_tokens={ - requests[0].request_id: - [10, 42], - requests[1].request_id: [13] - }, - num_common_prefix_blocks=0, - finished_req_ids=set(), - free_encoder_mm_hashes=[]) - model_output = ModelRunnerOutput( - req_ids=[req.request_id for req in requests], - req_id_to_index={ - req.request_id: i - for i, req in enumerate(requests) - }, - sampled_token_ids=[[10, 42, 12], - [13, 14]], # First request hits stop token - logprobs=None, - prompt_logprobs_dict={}, - pooler_output=[]) - - scheduler.update_from_output(scheduler_output, model_output) - - # Verify first request stopped on custom token - self.assertEqual(len(scheduler.running), 1) - self.assertEqual(scheduler.running[0].request_id, - requests[1].request_id) - self.assertEqual(requests[0].status, RequestStatus.FINISHED_STOPPED) - self.assertEqual(requests[0].stop_reason, 42) - self.assertIn(requests[0].request_id, scheduler.finished_req_ids) - self.assertEqual(list(requests[0].output_token_ids), [10, 42]) - self.assertEqual(list(requests[1].output_token_ids), [13, 14]) - - # Test case 3: Stop on max tokens - NUM_SPECULATIVE_TOKENS = 2 - scheduler = self.create_scheduler() - requests = create_requests(num_requests=2, max_tokens=2) - for req in requests: - req.num_computed_tokens = req.num_tokens - scheduler.requests[req.request_id] = req - scheduler.running.append(req) - req.status = RequestStatus.RUNNING - - scheduler_output = SchedulerOutput(scheduled_new_reqs=[], - scheduled_cached_reqs=[], - num_scheduled_tokens={ - requests[0].request_id: 3, - requests[1].request_id: 1 - }, - total_num_scheduled_tokens=4, - scheduled_encoder_inputs={}, - scheduled_spec_decode_tokens={ - requests[0].request_id: - [10, 11], - requests[1].request_id: [] - }, - num_common_prefix_blocks=0, - finished_req_ids=set(), - free_encoder_mm_hashes=[]) - model_output = ModelRunnerOutput( - req_ids=[req.request_id for req in requests], - req_id_to_index={ - req.request_id: i - for i, req in enumerate(requests) - }, - sampled_token_ids=[[10, 11, 12], - [13]], # First request exceeds max_tokens - logprobs=None, - prompt_logprobs_dict={}, - pooler_output=[]) - scheduler.update_from_output(scheduler_output, model_output) - - # Verify first request stopped due to length - self.assertEqual(len(scheduler.running), 1) - self.assertEqual(scheduler.running[0].request_id, - requests[1].request_id) - self.assertEqual(requests[0].status, - RequestStatus.FINISHED_LENGTH_CAPPED) - self.assertIn(requests[0].request_id, scheduler.finished_req_ids) - self.assertEqual(list(requests[0].output_token_ids), [10, 11]) - self.assertEqual(list(requests[1].output_token_ids), [13]) - - # Test case 4: Ignore EOS flag - scheduler = self.create_scheduler() - requests = create_requests(num_requests=1, max_tokens=10) - requests[0].sampling_params.ignore_eos = True - requests[0].num_computed_tokens = requests[0].num_tokens - scheduler.requests[requests[0].request_id] = requests[0] - scheduler.running.append(requests[0]) - - scheduler_output = SchedulerOutput( - scheduled_new_reqs=[], - scheduled_cached_reqs=[], - num_scheduled_tokens={requests[0].request_id: 3}, - total_num_scheduled_tokens=3, - scheduled_encoder_inputs={}, - scheduled_spec_decode_tokens={ - requests[0].request_id: [EOS_TOKEN_ID, 10] - }, - num_common_prefix_blocks=0, - finished_req_ids=set(), - free_encoder_mm_hashes=[]) - model_output = ModelRunnerOutput( - req_ids=[requests[0].request_id], - req_id_to_index={requests[0].request_id: 0}, - sampled_token_ids=[[EOS_TOKEN_ID, 10, 11]], - logprobs=None, - prompt_logprobs_dict={}, - pooler_output=[]) - - scheduler.update_from_output(scheduler_output, model_output) - - # Verify request continues past EOS - self.assertEqual(len(scheduler.running), 1) - self.assertFalse(requests[0].is_finished()) - self.assertEqual(list(requests[0].output_token_ids), - [EOS_TOKEN_ID, 10, 11]) - - def test_schedule_concurrent_batches(self): - global MAX_NUM_BATCHED_TOKENS - global ENABLE_PREFIX_CACHING - global ENABLE_CHUNKED_PREFILL - global MAX_NUM_SEQS - global PROMPT_LOGPROBS - ENABLE_PREFIX_CACHING = None - MAX_NUM_BATCHED_TOKENS = 1024 - MAX_NUM_SEQS = 2 - ENABLE_CHUNKED_PREFILL = True - PROMPT_LOGPROBS = None - - enable_prefix_caching_list = [None, True] - prompt_logprobs_list = [None, 5] - - for i in range(len(enable_prefix_caching_list)): - ENABLE_PREFIX_CACHING = enable_prefix_caching_list[i] - PROMPT_LOGPROBS = prompt_logprobs_list[i] - scheduler = self.create_scheduler() - requests = create_requests( - num_requests=2, - num_tokens=512, - ) - - # Schedule the first request. - scheduler.add_request(requests[0]) - scheduler_output0 = scheduler.schedule() - self.assertEqual(len(scheduler_output0.scheduled_new_reqs), 1) - self.assertEqual( - scheduler_output0.num_scheduled_tokens[requests[0].request_id], - 512) - - # The first request is still running, so only schedule the second request. - scheduler.add_request(requests[1]) - scheduler_output1 = scheduler.schedule() - self.assertEqual(len(scheduler_output1.scheduled_new_reqs), 1) - self.assertEqual( - scheduler_output1.num_scheduled_tokens[requests[1].request_id], - 512) - - # Model output of the first request. - model_runner_output = ModelRunnerOutput( - req_ids=[requests[0].request_id], - req_id_to_index={requests[0].request_id: 0}, - sampled_token_ids=[[0]], - logprobs=None, - prompt_logprobs_dict={}, - pooler_output=[]) - - scheduler.update_from_output(scheduler_output0, - model_runner_output) - - # Schedule the next step. - # The first request can be scheduled again while the second - # request is still running. - scheduler.schedule() - # Model output of the second request. - model_runner_output = ModelRunnerOutput( - req_ids=[requests[1].request_id], - req_id_to_index={requests[1].request_id: 0}, - sampled_token_ids=[[0]], - logprobs=None, - prompt_logprobs_dict={}, - pooler_output=[]) - - scheduler.update_from_output(scheduler_output1, - model_runner_output) - - def test_schedule_spec_decoding_stats(self): - """Test scheduling behavior with speculative decoding. - - This test verifies that: - 1. Speculated tokens get scheduled correctly - 2. Spec decoding stats properly count number of draft and accepted tokens - """ - spec_tokens_list: List[List[List[int]]] = [[[1, 2, 3]], [[1, 2, 3]], - [[1, 2], [3]], [[1]], [[]], - [[1, 2, 3], [4, 5, 6]]] - output_tokens_list: List[List[List[int]]] = [[[1, 2, 3, 4]], [[1, 5]], - [[1, 2, 5], [3, 4]], - [[1, 2]], [[5]], - [[1, 2, 7], [4, 8]]] - expected_list: List[Tuple[int, int, - int, List[int]]] = [(1, 3, 3, [1, 1, 1]), - (1, 3, 1, [1, 0, 0]), - (2, 3, 3, [2, 1]), - (1, 1, 1, [1]), - (0, 0, 0, [0]), - (2, 6, 3, [2, 1, 0])] - - global NUM_SPECULATIVE_TOKENS - for idx in range(len(spec_tokens_list)): - spec_tokens = spec_tokens_list[idx] - output_tokens = output_tokens_list[idx] - expected = expected_list[idx] - num_spec_tokens = max(1, max(len(t) for t in spec_tokens)) - NUM_SPECULATIVE_TOKENS = num_spec_tokens - scheduler = self.create_scheduler() - requests = create_requests(num_requests=len(spec_tokens), - num_tokens=1) - req_ids = [] - req_to_index = {} - for i, request in enumerate(requests): - scheduler.add_request(request) - req_ids.append(request.request_id) - req_to_index[request.request_id] = i - - # Schedule a decode, which will also draft speculative tokens - output = scheduler.schedule() - self.assertEqual(len(output.scheduled_new_reqs), len(requests)) - self.assertEqual(output.total_num_scheduled_tokens, len(requests)) - for i in range(len(requests)): - req_id = requests[i].request_id - self.assertEqual(output.num_scheduled_tokens[req_id], 1) - self.assertNotIn(req_id, output.scheduled_spec_decode_tokens) - - model_runner_output = ModelRunnerOutput( - req_ids=req_ids, - req_id_to_index=req_to_index, - sampled_token_ids=[[0] for _ in range(len(requests))], - logprobs=None, - prompt_logprobs_dict={}, - pooler_output=[]) - draft_token_ids = DraftTokenIds(req_ids, spec_tokens) - - engine_core_outputs = scheduler.update_from_output( - output, model_runner_output) - scheduler.update_draft_token_ids(draft_token_ids) - - for i in range(len(requests)): - running_req = scheduler.running[i] - # The prompt token - self.assertEqual(running_req.num_computed_tokens, 1) - # The prompt token and the sampled token - self.assertEqual(running_req.num_tokens, 2) - # The prompt token, the sampled token, and the speculated tokens - self.assertEqual(running_req.num_tokens_with_spec, - 2 + len(spec_tokens[i])) - - # No draft or accepted tokens counted yet - self.assertTrue( - not engine_core_outputs - or (engine_core_outputs[0].scheduler_stats.spec_decoding_stats - is None)) - - # Schedule the speculated tokens for validation - output = scheduler.schedule() - self.assertEqual(len(output.scheduled_new_reqs), 0) - # The sampled token and speculated tokens - self.assertEqual( - output.total_num_scheduled_tokens, - len(requests) + sum(len(ids) for ids in spec_tokens)) - for i in range(len(requests)): - req_id = requests[i].request_id - self.assertEqual(output.num_scheduled_tokens[req_id], - 1 + len(spec_tokens[i])) - if spec_tokens[i]: - self.assertEqual( - len(output.scheduled_spec_decode_tokens[req_id]), - len(spec_tokens[i])) - else: - self.assertNotIn(req_id, - output.scheduled_spec_decode_tokens) - - model_runner_output = ModelRunnerOutput( - req_ids=req_ids, - req_id_to_index=req_to_index, - sampled_token_ids=output_tokens, - logprobs=None, - prompt_logprobs_dict={}, - pooler_output=[]) - - engine_core_outputs = scheduler.update_from_output( - output, model_runner_output) - - scheduler_stats = engine_core_outputs[0].scheduler_stats \ - if engine_core_outputs else None - if expected[0] == 0: - self.assertIsNone(scheduler_stats.spec_decoding_stats) - else: - self.assertIsNotNone(scheduler_stats.spec_decoding_stats) - stats = scheduler_stats.spec_decoding_stats - self.assertEqual(stats.num_drafts, expected[0]) - self.assertEqual(stats.num_draft_tokens, expected[1]) - self.assertEqual(stats.num_accepted_tokens, expected[2]) - self.assertEqual(stats.num_accepted_tokens_per_pos, - expected[3]) - - def assert_scheduler_empty(self, scheduler): - """Confirm the scheduler is "empty" - i.e. no leaks.""" - # Scheduler Metadata. - scheduler = self.create_scheduler() - self.assertEqual(len(scheduler.requests), 0) - self.assertEqual(len(scheduler.waiting), 0) - self.assertEqual(len(scheduler.running), 0) - self.assertEqual(len(scheduler.finished_req_ids), 0) - - # EncoderCacheManager. - self.assertEqual(len(scheduler.encoder_cache_manager.freed), 0) - self.assertEqual(len(scheduler.encoder_cache_manager.cached), 0) - - # KVCache Manager. - self.assertEqual( - len(scheduler.kv_cache_manager.coordinator.single_type_managers[0]. - req_to_blocks), 0) - self.assertEqual( - len(scheduler.kv_cache_manager.coordinator.single_type_managers[0]. - num_cached_block), 0) - num_free_blocks = (scheduler.kv_cache_manager.block_pool. - free_block_queue.num_free_blocks) - self.assertEqual( - num_free_blocks, - scheduler.kv_cache_manager.block_pool.num_gpu_blocks - 1) - - # NOTE(rob): just the ref count on blocks will be 0. The hash - # value, etc will remain since we lazily evict for prefix cache. - for block in scheduler.kv_cache_manager.block_pool.blocks: - self.assertEqual(block.ref_cnt, 0) - - def test_memory_leak(self): - """Test that we do not have a memory leak.""" - scheduler = self.create_scheduler() - NUM_REQUESTS = 5 - NUM_TOKENS = 10 - MAX_TOKENS = 10 - requests = create_requests(num_requests=NUM_REQUESTS, - num_tokens=NUM_TOKENS, - max_tokens=MAX_TOKENS) - - # Add each request. - for request in requests: - scheduler.add_request(request) - scheduler_output = scheduler.schedule() - model_runner_output = make_output(scheduler) - scheduler.update_from_output(scheduler_output, model_runner_output) - - # Iterate until done. - while True: - scheduler_output = scheduler.schedule() - if len(scheduler.running) == 0: - break - model_runner_output = make_output(scheduler) - scheduler.update_from_output(scheduler_output, model_runner_output) - - # Confirm no memory leak. - self.assert_scheduler_empty(scheduler) - - def test_scheduler_with_pd_transfer(self): - scheduler = self.create_scheduler() - scheduler.phase = "prefill" - requests = create_requests(num_requests=32) - for request in requests: - scheduler.add_request(request) - - # 1st iteration, move 16 requests from waiting to running for prefill - scheduler_output = scheduler.schedule() - model_runner_output = make_output(scheduler) - scheduler.update_from_output(scheduler_output, model_runner_output) - first_iter_prefilled_req_num = len(scheduler.running) - self.assertEqual(len(scheduler_output.scheduled_new_reqs), - scheduler.max_num_running_reqs) - self.assertEqual(scheduler_output.scheduled_cached_reqs.num_reqs, 0) - self.assertEqual(len(scheduler_output.finished_req_ids), 0) - - # 2nd iteration, move 16 prefilled requests to finished_prefill_reqs - # and move 16 requests from waiting to running for prefill - scheduler_output = scheduler.schedule() - model_runner_output = make_output(scheduler) - scheduler.update_from_output(scheduler_output, model_runner_output) - self.assertEqual(len(scheduler.finished_prefill_reqs), - first_iter_prefilled_req_num) - - # 3rd iteration, all requests prefilled, change scheduler phase to decode - scheduler_output = scheduler.schedule() - model_runner_output = make_output(scheduler) - scheduler.update_from_output(scheduler_output, model_runner_output) - self.assertEqual(scheduler.phase, "decode") - - class TestSchedulerDynamicBatch(TestBase): @patch("vllm.config.ModelConfig.__post_init__", MagicMock()) diff --git a/tests/ut/models/__init__.py b/tests/ut/models/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/tests/ut/models/conftest.py b/tests/ut/models/conftest.py deleted file mode 100644 index faad7ff6..00000000 --- a/tests/ut/models/conftest.py +++ /dev/null @@ -1,100 +0,0 @@ -from types import SimpleNamespace -from unittest.mock import MagicMock, Mock, patch - -import pytest -import torch -from transformers import PretrainedConfig -from vllm.config import CacheConfig, EPLBConfig, ParallelConfig -from vllm.distributed.parallel_state import GroupCoordinator - - -@pytest.fixture -def base_config(): - config = PretrainedConfig( - hidden_size=128, - num_attention_heads=8, - num_hidden_layers=2, - intermediate_size=256, - hidden_act="silu", - rms_norm_eps=1e-6, - rope_theta=10000.0, - max_position_embeddings=2048, - n_routed_experts=4, - n_shared_experts=1, - moe_intermediate_size=256, - num_experts_per_tok=2, - routed_scaling_factor=1.0, - first_k_dense_replace=0, - moe_layer_freq=1, - kv_lora_rank=16, - qk_nope_head_dim=16, - qk_rope_head_dim=16, - v_head_dim=32, - topk_method="noaux_tc", - scoring_func="softmax", - norm_topk_prob=True, - n_group=1, - topk_group=1, - vocab_size=10000, - ) - return config - - -@pytest.fixture -def vllm_config(base_config): - model_config = SimpleNamespace( - hf_config=base_config, - tensor_parallel_size=1, - dtype=torch.float32, - use_mla=True, - quant_config=None, - max_model_len=2048, - ) - parallel_config = MagicMock(spec=ParallelConfig) - eplb_config = MagicMock(spec=EPLBConfig) - eplb_config.num_redundant_experts = 0 - parallel_config.eplb_config = eplb_config - - cache_config = CacheConfig() - vllm_config = Mock() - vllm_config.model_config = model_config - vllm_config.cache_config = cache_config - vllm_config.quant_config = None - vllm_config.parallel_config = parallel_config - return vllm_config - - -@pytest.fixture -def mock_distributed(): - tp_group = Mock(spec=GroupCoordinator) - tp_group.rank_in_group = 0 - tp_group.world_size = 1 - tp_group.device_group = Mock() - - dp_group = Mock(spec=GroupCoordinator) - dp_group.rank_in_group = 0 - dp_group.world_size = 1 - - ep_group = Mock(spec=GroupCoordinator) - ep_group.rank_in_group = 0 - ep_group.world_size = 1 - ep_group.device_group = Mock() - ep_group.device_group.rank.return_value = 0 - ep_group.device_group.size.return_value = 1 - - pp_group = Mock(spec=GroupCoordinator) - pp_group.rank_in_group = 0 - pp_group.world_size = 1 - - mock_vllm_config = Mock() - mock_vllm_config.scheduler_config = Mock(max_num_seqs=256) - mock_vllm_config.model_config = Mock(max_model_len=2048, quant_config=None) - - with patch("vllm_ascend.ops.fused_moe.fused_moe.get_current_vllm_config", return_value=mock_vllm_config), \ - patch("vllm_ascend.ops.fused_moe.token_dispatcher.torch.distributed.get_rank", return_value=0), \ - patch("vllm_ascend.ops.fused_moe.token_dispatcher.get_ascend_device_type", return_value=None), \ - patch.dict("vllm.distributed.parallel_state.__dict__", _TP=tp_group, _EP=ep_group, _DP=dp_group, - _PP=pp_group), \ - patch.dict("vllm_ascend.distributed.parallel_state.__dict__", _MC2=ep_group), \ - patch("torch.npu.current_device", return_value=0): - yield diff --git a/tests/ut/models/test_mla.py b/tests/ut/ops/test_mla.py similarity index 100% rename from tests/ut/models/test_mla.py rename to tests/ut/ops/test_mla.py