Files
xc-llm-ascend/tests/ut/worker/test_worker_v1.py
Icey c5fe179cef [0.11.0] [Cherry-pick #4058] Fixes Qwen3-Next enable nz accuracy problem (#4056)
### What this PR does / why we need it?
- Fixes Qwen3-Next enable nz accuracy problem

---------

Signed-off-by: wxsIcey <1790571317@qq.com>
Signed-off-by: Icey <1790571317@qq.com>
2025-11-10 20:56:39 +08:00

1241 lines
52 KiB
Python

import os
import unittest
from unittest.mock import MagicMock, patch
import torch
from vllm.config import CacheConfig, ModelConfig, ParallelConfig, VllmConfig
from tests.ut.base import TestBase
class TestNPUWorker(TestBase):
def setUp(self):
"""Setup test environment"""
# Create configuration mocks
self.cache_config_mock = MagicMock(spec=CacheConfig)
self.cache_config_mock.cache_dtype = "auto"
self.model_config_mock = MagicMock(spec=ModelConfig)
self.model_config_mock.dtype = torch.float16
self.model_config_mock.trust_remote_code = False
self.hf_config_mock = MagicMock()
self.hf_config_mock.model_type = "test_model"
if hasattr(self.hf_config_mock, 'index_topk'):
delattr(self.hf_config_mock, 'index_topk')
self.model_config_mock.hf_config = self.hf_config_mock
self.parallel_config_mock = MagicMock(spec=ParallelConfig)
self.vllm_config_mock = MagicMock(spec=VllmConfig)
self.vllm_config_mock.cache_config = self.cache_config_mock
self.vllm_config_mock.model_config = self.model_config_mock
self.vllm_config_mock.parallel_config = self.parallel_config_mock
self.vllm_config_mock.additional_config = None
self.vllm_config_mock.load_config = None
self.vllm_config_mock.scheduler_config = None
self.vllm_config_mock.device_config = None
self.vllm_config_mock.compilation_config = None
self.local_rank = 0
self.rank = 0
self.distributed_init_method = "tcp://localhost:12345"
self.is_driver_worker = False
@patch("vllm_ascend.utils.adapt_patch")
@patch("vllm_ascend.ops")
@patch("vllm_ascend.worker.worker_v1._register_atb_extensions")
@patch("vllm_ascend.worker.worker_v1.register_ascend_customop")
@patch("vllm_ascend.worker.worker_v1.init_ascend_config")
@patch("vllm_ascend.worker.worker_v1.init_ascend_soc_version")
@patch("vllm_ascend.worker.worker_v1.try_register_lib")
@patch("vllm.utils.init_cached_hf_modules")
@patch("vllm_ascend.worker.worker_v1.NPUWorker._init_profiler")
def test_init_npu_worker_normal_case(
self,
mock_init_profiler,
mock_init_cached_hf_modules,
mock_try_register_lib,
mock_init_ascend_soc_version,
mock_init_ascend_config,
mock_register_ascend_customop,
mock_register_atb_extensions,
mock_ops,
mock_adapt_patch,
):
"""Test NPUWorker normal initialization"""
# Setup mock behavior
mock_ops.register_dummy_fusion_op.return_value = None
# Import and create NPUWorker instance
from vllm_ascend.worker.worker_v1 import NPUWorker
worker = NPUWorker(
vllm_config=self.vllm_config_mock,
local_rank=self.local_rank,
rank=self.rank,
distributed_init_method=self.distributed_init_method,
is_driver_worker=self.is_driver_worker,
)
# Verify initialization call order
mock_adapt_patch.assert_called_once()
mock_ops.register_dummy_fusion_op.assert_called_once()
mock_register_atb_extensions.assert_called_once()
mock_register_ascend_customop.assert_called_once()
mock_init_ascend_config.assert_called_once_with(self.vllm_config_mock)
mock_init_ascend_soc_version.assert_called_once()
# Verify try_register_lib call
mock_try_register_lib.assert_called_once_with(
"mindie_turbo",
"MindIE Turbo is installed. vLLM inference will be accelerated with MindIE Turbo.",
)
# Verify cache_dtype setting
self.assertEqual(worker.cache_dtype, torch.float16)
mock_init_profiler.assert_called_once()
# Verify init_cached_hf_modules is not called (trust_remote_code=False)
mock_init_cached_hf_modules.assert_not_called()
@patch("vllm_ascend.utils.adapt_patch")
@patch("vllm_ascend.ops")
@patch("vllm_ascend.worker.worker_v1._register_atb_extensions")
@patch("vllm_ascend.worker.worker_v1.register_ascend_customop")
@patch("vllm_ascend.worker.worker_v1.init_ascend_config")
@patch("vllm_ascend.worker.worker_v1.init_ascend_soc_version")
@patch("vllm_ascend.worker.worker_v1.try_register_lib")
@patch("vllm.utils.init_cached_hf_modules")
@patch("vllm_ascend.worker.worker_v1.NPUWorker._init_profiler")
def test_init_npu_worker_with_trust_remote_code(
self,
mock_init_profiler,
mock_init_cached_hf_modules,
mock_try_register_lib,
mock_init_ascend_soc_version,
mock_init_ascend_config,
mock_register_ascend_customop,
mock_register_atb_extensions,
mock_ops,
mock_adapt_patch,
):
"""Test NPUWorker initialization with trust_remote_code=True"""
# Set trust_remote_code=True
self.model_config_mock.trust_remote_code = True
mock_ops.register_dummy_fusion_op.return_value = None
# Create NPUWorker instance
from vllm_ascend.worker.worker_v1 import NPUWorker
_ = NPUWorker(
vllm_config=self.vllm_config_mock,
local_rank=self.local_rank,
rank=self.rank,
distributed_init_method=self.distributed_init_method,
is_driver_worker=self.is_driver_worker,
)
# Verify init_cached_hf_modules is called (trust_remote_code=True)
mock_init_cached_hf_modules.assert_called_once()
@patch("vllm_ascend.utils.adapt_patch")
@patch("vllm_ascend.ops")
@patch("vllm_ascend.worker.worker_v1._register_atb_extensions")
@patch("vllm_ascend.worker.worker_v1.register_ascend_customop")
@patch("vllm_ascend.worker.worker_v1.init_ascend_config")
@patch("vllm_ascend.worker.worker_v1.init_ascend_soc_version")
@patch("vllm_ascend.worker.worker_v1.try_register_lib")
@patch("vllm.utils.init_cached_hf_modules")
@patch("vllm_ascend.worker.worker_v1.NPUWorker._init_profiler")
def test_init_npu_worker_with_custom_cache_dtype(
self,
mock_init_profiler,
mock_init_cached_hf_modules,
mock_try_register_lib,
mock_init_ascend_soc_version,
mock_init_ascend_config,
mock_register_ascend_customop,
mock_register_atb_extensions,
mock_ops,
mock_adapt_patch,
):
"""Test NPUWorker initialization with custom cache_dtype"""
# Set custom cache_dtype
self.cache_config_mock.cache_dtype = "float32"
mock_ops.register_dummy_fusion_op.return_value = None
# Create NPUWorker instance
from vllm_ascend.worker.worker_v1 import NPUWorker
with patch("vllm.utils.STR_DTYPE_TO_TORCH_DTYPE",
{"float32": torch.float32}):
worker = NPUWorker(
vllm_config=self.vllm_config_mock,
local_rank=self.local_rank,
rank=self.rank,
distributed_init_method=self.distributed_init_method,
is_driver_worker=self.is_driver_worker,
)
# Verify cache_dtype is set to custom value
self.assertEqual(worker.cache_dtype, torch.float32)
def test_initialize_cache(self):
"""Test initialize_cache method"""
from vllm_ascend.worker.worker_v1 import NPUWorker
# Create a simple worker mock
with patch.object(NPUWorker, "__init__", lambda x, **kwargs: None):
worker = NPUWorker()
worker.cache_config = MagicMock()
# Test initialize_cache
worker.initialize_cache(100, 50)
# Verify parameter setting
self.assertEqual(worker.cache_config.num_gpu_blocks, 100)
self.assertEqual(worker.cache_config.num_cpu_blocks, 50)
@patch("vllm_ascend.worker.worker_v1.sleep_mode_enabled")
@patch("vllm_ascend.worker.worker_v1.NPUPlatform")
@patch("vllm_ascend.worker.worker_v1.CaMemAllocator")
@patch("vllm_ascend.worker.worker_v1.logger")
def test_sleep_mode_enabled(self, mock_logger, mock_allocator_class,
mock_platform, mock_sleep_mode_enabled):
"""Test sleep method when sleep mode is enabled"""
from vllm_ascend.worker.worker_v1 import NPUWorker
# Setup mock
mock_sleep_mode_enabled.return_value = True
mock_platform.mem_get_info.side_effect = [
(1000, 2000),
(1200, 2000),
] # before, after
mock_allocator = MagicMock()
mock_allocator_class.get_instance.return_value = mock_allocator
# Create worker mock
with patch.object(NPUWorker, "__init__", lambda x, **kwargs: None):
worker = NPUWorker()
# Test sleep method
worker.sleep(level=1)
# Verify calls
mock_sleep_mode_enabled.assert_called_once()
mock_allocator.sleep.assert_called_once_with(
offload_tags=("weights", ))
self.assertEqual(mock_platform.mem_get_info.call_count,
2) # Called 2 times in sleep method
# Verify log output
mock_logger.info.assert_called_once()
@patch("vllm_ascend.worker.worker_v1.sleep_mode_enabled")
def test_sleep_mode_disabled_raises_error(self, mock_sleep_mode_enabled):
"""Test sleep method raises exception when sleep mode is disabled"""
from vllm_ascend.worker.worker_v1 import NPUWorker
# Set sleep mode disabled
mock_sleep_mode_enabled.return_value = False
# Create worker mock
with patch.object(NPUWorker, "__init__", lambda x, **kwargs: None):
worker = NPUWorker()
# Test sleep method should raise exception
with self.assertRaises(ValueError) as cm:
worker.sleep()
self.assertIn("Sleep mode is not enabled", str(cm.exception))
@patch('vllm_ascend.utils._ENABLE_NZ', False)
@patch("vllm_ascend.worker.worker_v1.sleep_mode_enabled")
@patch("vllm_ascend.worker.worker_v1.CaMemAllocator")
@patch.dict(os.environ, {"VLLM_ASCEND_ENABLE_NZ": "0"})
def test_wake_up_mode_enabled(self, mock_allocator_class,
mock_sleep_mode_enabled):
"""Test wake_up method when sleep mode is enabled"""
from vllm_ascend.worker.worker_v1 import NPUWorker
# Setup mock
mock_sleep_mode_enabled.return_value = True
mock_allocator = MagicMock()
mock_allocator_class.get_instance.return_value = mock_allocator
# Create worker mock
with patch.object(NPUWorker, "__init__", lambda x, **kwargs: None):
worker = NPUWorker()
worker._sleep_saved_buffers = {}
# Test wake_up method
worker.wake_up(tags=["test_tag"])
# Verify calls
mock_sleep_mode_enabled.assert_called_once()
mock_allocator.wake_up.assert_called_once_with(tags=["test_tag"])
@patch("vllm_ascend.worker.worker_v1.sleep_mode_enabled")
@patch.dict(os.environ, {"VLLM_ASCEND_ENABLE_NZ": "0"})
def test_wake_up_mode_disabled_raises_error(self, mock_sleep_mode_enabled):
"""Test wake_up method raises exception when sleep mode is disabled"""
from vllm_ascend.worker.worker_v1 import NPUWorker
# Set sleep mode disabled
mock_sleep_mode_enabled.return_value = False
# Create worker mock
with patch.object(NPUWorker, "__init__", lambda x, **kwargs: None):
worker = NPUWorker()
# Test wake_up method should raise exception
with self.assertRaises(ValueError) as cm:
worker.wake_up()
self.assertIn("Sleep mode is not enabled", str(cm.exception))
@patch(
"vllm_ascend.worker.worker_v1.NPUWorker._init_worker_distributed_environment"
)
@patch("vllm_ascend.worker.worker_v1.NPUPlatform")
def test_init_device(self, mock_platform, mock_init_dist_env):
"""Test _init_device method"""
from vllm_ascend.worker.worker_v1 import NPUWorker
# Setup mock
mock_platform.mem_get_info.return_value = (1000, 2000)
# Create worker mock
with patch.object(NPUWorker, "__init__", lambda x, **kwargs: None):
worker = NPUWorker()
worker.local_rank = 1
worker.model_config = MagicMock()
worker.model_config.seed = 42
# Test _init_device
result = worker._init_device()
# Verify NPUPlatform.set_device is called
mock_platform.set_device.assert_called_once()
# Verify the parameter passed to set_device is a torch.device object
call_args = mock_platform.set_device.call_args[0][0]
self.assertEqual(str(call_args), "npu:1")
mock_platform.empty_cache.assert_called_once()
mock_platform.seed_everything.assert_called_once_with(42)
mock_platform.mem_get_info.assert_called_once(
) # Called once in _init_device method
mock_init_dist_env.assert_called_once(
) # Verify distributed initialization is called
# Verify return value is a torch.device object
self.assertEqual(str(result), "npu:1")
self.assertEqual(worker.init_npu_memory, 1000)
def test_profile_start_stop(self):
"""Test profile method start and stop"""
from vllm_ascend.worker.worker_v1 import NPUWorker
# Create worker mock
with patch.object(NPUWorker, "__init__", lambda x, **kwargs: None):
worker = NPUWorker()
mock_profiler = MagicMock()
worker.profiler = mock_profiler
# Test start profiler
worker.profile(is_start=True)
mock_profiler.start.assert_called_once()
# Test stop profiler
worker.profile(is_start=False)
mock_profiler.stop.assert_called_once()
def test_profile_no_profiler_raises_error(self):
"""Test profile method raises exception when profiler is not available"""
from vllm_ascend.worker.worker_v1 import NPUWorker
# Create worker mock
with patch.object(NPUWorker, "__init__", lambda x, **kwargs: None):
worker = NPUWorker()
worker.profiler = None
# Test should raise exception
with self.assertRaises(RuntimeError) as cm:
worker.profile()
self.assertIn("Profiler is not enabled", str(cm.exception))
@patch("vllm_ascend.worker.worker_v1.envs_vllm")
@patch("vllm_ascend.worker.worker_v1.envs_ascend")
def test_profile_and_msmonitor_both_enabled_raises_error(
self, mock_envs_vllm, mock_envs_ascend):
"""Test profile method raises exception when both profiler and msmonitor are enabled"""
from vllm_ascend.worker.worker_v1 import NPUWorker
mock_envs_vllm.VLLM_TORCH_PROFILER_DIR = "/path/to/traces"
mock_envs_ascend.MSMONITOR_USE_DAEMON = 1
# Create worker mock
with patch.object(NPUWorker, "__init__", lambda x, **kwargs: None):
worker = NPUWorker()
# Test should raise exception
with self.assertRaises(RuntimeError) as cm:
_ = worker._init_profiler()
self.assertIn(
"MSMONITOR_USE_DAEMON and VLLM_TORCH_PROFILER_DIR cannot be both set at the same time.",
str(cm.exception))
def test_lora_methods(self):
"""Test LoRA related methods"""
from vllm_ascend.worker.worker_v1 import NPUWorker
# Create worker mock
with patch.object(NPUWorker, "__init__", lambda x, **kwargs: None):
worker = NPUWorker()
mock_model_runner = MagicMock()
worker.model_runner = mock_model_runner
# Set return values
mock_model_runner.add_lora.return_value = True
mock_model_runner.remove_lora.return_value = True
mock_model_runner.list_loras.return_value = {1, 2, 3}
mock_model_runner.pin_lora.return_value = True
# Test each method
mock_request = MagicMock()
self.assertTrue(worker.add_lora(mock_request))
mock_model_runner.add_lora.assert_called_once_with(mock_request)
self.assertTrue(worker.remove_lora(1))
mock_model_runner.remove_lora.assert_called_once_with(1)
self.assertEqual(worker.list_loras(), {1, 2, 3})
mock_model_runner.list_loras.assert_called_once()
self.assertTrue(worker.pin_lora(2))
mock_model_runner.pin_lora.assert_called_once_with(2)
def test_get_methods(self):
"""Test various get methods"""
from vllm_ascend.worker.worker_v1 import NPUWorker
# Create worker mock
with patch.object(NPUWorker, "__init__", lambda x, **kwargs: None):
worker = NPUWorker()
mock_model_runner = MagicMock()
worker.model_runner = mock_model_runner
# Set return values
mock_model = MagicMock()
mock_kv_cache_spec = {"test": "spec"}
mock_pooling_tasks = ["task1", "task2"]
mock_supported_tasks = ("task1", "task2")
mock_model_runner.get_model.return_value = mock_model
mock_model_runner.get_kv_cache_spec.return_value = mock_kv_cache_spec
mock_model_runner.get_supported_pooling_tasks.return_value = (
mock_pooling_tasks)
mock_model_runner.get_supported_tasks.return_value = mock_supported_tasks
# Test each get method
self.assertEqual(worker.get_model(), mock_model)
self.assertEqual(worker.get_kv_cache_spec(), mock_kv_cache_spec)
self.assertEqual(worker.get_supported_pooling_tasks(),
mock_pooling_tasks)
self.assertEqual(worker.get_supported_tasks(),
mock_supported_tasks)
def test_execute_dummy_batch(self):
"""Test execute_dummy_batch method"""
from vllm_ascend.worker.worker_v1 import NPUWorker
# Create worker mock
with patch.object(NPUWorker, "__init__", lambda x, **kwargs: None):
worker = NPUWorker()
worker.compilation_config = MagicMock()
worker.compilation_config.cudagraph_mode = MagicMock()
mock_model_runner = MagicMock()
mock_decode_token_per_req = mock_model_runner.decode_token_per_req
worker.model_runner = mock_model_runner
# Test execute_dummy_batch
worker.execute_dummy_batch()
# Verify call
mock_model_runner._dummy_run.assert_called_once_with(
num_tokens=mock_decode_token_per_req, uniform_decode=True)
@patch("vllm_ascend.worker.worker_v1.envs_vllm")
@patch("vllm_ascend.worker.worker_v1.logger")
@patch("torch_npu.profiler._ExperimentalConfig")
@patch("torch_npu.profiler.profile")
@patch("torch_npu.profiler.tensorboard_trace_handler")
@patch("torch_npu.profiler.ExportType")
@patch("torch_npu.profiler.ProfilerLevel")
@patch("torch_npu.profiler.AiCMetrics")
@patch("torch_npu.profiler.ProfilerActivity")
def test_init_profiler_enabled(
self,
mock_profiler_activity,
mock_aic_metrics,
mock_profiler_level,
mock_export_type,
mock_trace_handler,
mock_profile,
mock_experimental_config,
mock_logger,
mock_envs_vllm,
):
"""Test _init_profiler method - profiler enabled case with stack and memory profiling enabled"""
from vllm_ascend.worker.worker_v1 import NPUWorker
# Set environment variables to enable profiler
mock_envs_vllm.VLLM_TORCH_PROFILER_DIR = "/path/to/traces"
mock_envs_vllm.VLLM_TORCH_PROFILER_WITH_STACK = True
mock_envs_vllm.VLLM_TORCH_PROFILER_WITH_PROFILE_MEMORY = True
# Set enum mocks
mock_export_type.Text = "Text"
mock_profiler_level.Level1 = "Level1"
mock_aic_metrics.AiCoreNone = "AiCoreNone"
mock_profiler_activity.CPU = "CPU"
mock_profiler_activity.NPU = "NPU"
# Set mock return values
mock_experimental_config_instance = MagicMock()
mock_experimental_config.return_value = mock_experimental_config_instance
mock_trace_handler_instance = MagicMock()
mock_trace_handler.return_value = mock_trace_handler_instance
mock_profiler_instance = MagicMock()
mock_profile.return_value = mock_profiler_instance
# Create worker mock
with patch.object(NPUWorker, "__init__", lambda x, **kwargs: None):
worker = NPUWorker()
# Test _init_profiler
result = worker._init_profiler()
# Verify log output
mock_logger.info.assert_called_once_with(
"Profiling enabled. Traces will be saved to: %s",
"/path/to/traces")
# Verify ExperimentalConfig creation
mock_experimental_config.assert_called_once()
config_call = mock_experimental_config.call_args
config_kwargs = config_call.kwargs
# Verify configuration parameters
expected_config = {
"export_type": "Text",
"profiler_level": "Level1",
"msprof_tx": False,
"aic_metrics": "AiCoreNone",
"l2_cache": False,
"op_attr": False,
"data_simplification": False,
"record_op_args": False,
"gc_detect_threshold": None,
}
for key, expected_value in expected_config.items():
self.assertEqual(config_kwargs[key], expected_value)
# Verify trace handler creation
mock_trace_handler.assert_called_once_with("/path/to/traces")
# Verify profiler creation
mock_profile.assert_called_once()
profile_call = mock_profile.call_args
profile_kwargs = profile_call.kwargs
# Verify profiler parameters
expected_activities = ["CPU", "NPU"]
self.assertEqual(profile_kwargs["activities"], expected_activities)
self.assertTrue(profile_kwargs["with_stack"])
self.assertTrue(profile_kwargs["profile_memory"])
self.assertFalse(profile_kwargs["with_modules"])
self.assertEqual(profile_kwargs["experimental_config"],
mock_experimental_config_instance)
self.assertEqual(profile_kwargs["on_trace_ready"],
mock_trace_handler_instance)
# Verify return value
self.assertEqual(result, mock_profiler_instance)
@patch("vllm_ascend.worker.worker_v1.envs_vllm")
def test_init_profiler_disabled(self, mock_envs_vllm):
"""Test _init_profiler method - profiler disabled case"""
from vllm_ascend.worker.worker_v1 import NPUWorker
# Set environment variable to disable profiler
mock_envs_vllm.VLLM_TORCH_PROFILER_DIR = None
# Create worker mock
with patch.object(NPUWorker, "__init__", lambda x, **kwargs: None):
worker = NPUWorker()
# Test _init_profiler
result = worker._init_profiler()
# Verify returns None
self.assertIsNone(result)
@patch("vllm_ascend.worker.worker_v1.envs_vllm")
def test_init_profiler_empty_dir(self, mock_envs_vllm):
"""Test _init_profiler method - empty directory string case"""
from vllm_ascend.worker.worker_v1 import NPUWorker
# Set environment variable to empty string
mock_envs_vllm.VLLM_TORCH_PROFILER_DIR = ""
# Create worker mock
with patch.object(NPUWorker, "__init__", lambda x, **kwargs: None):
worker = NPUWorker()
# Test _init_profiler
result = worker._init_profiler()
# Verify returns None (empty string is considered false)
self.assertIsNone(result)
@patch("vllm_ascend.worker.worker_v1.NPUPlatform.clear_npu_memory")
@patch("vllm_ascend.worker.worker_v1.NPUPlatform.empty_cache")
@patch("vllm_ascend.worker.worker_v1.NPUPlatform.mem_get_info")
@patch("torch_npu.npu.memory_stats")
@patch("torch_npu.npu.mem_get_info")
@patch("vllm_ascend.worker.worker_v1.logger")
def test_determine_available_memory_normal_case(
self,
mock_logger,
mock_torch_mem_get_info,
mock_torch_memory_stats,
mock_platform_mem_get_info,
mock_platform_empty_cache,
mock_platform_clear_npu_memory,
):
"""Test determine_available_memory normal case (no non-torch memory allocation)"""
from vllm_ascend.worker.worker_v1 import NPUWorker
# Setup mock - test case without non-torch memory allocation
mock_platform_mem_get_info.side_effect = [
(8000, 10000), # 1st call: before profile execution
(7000, 10000), # 2nd call: after profile execution
]
mock_torch_memory_stats.side_effect = [
{
"allocated_bytes.all.peak": 2000
}, # peak memory
{
"allocated_bytes.all.current": 3000
}, # current allocated = total_allocated_bytes
]
# Mock setup to simulate memory change between calls, exposing potential race condition
# The implementation calls torch_npu.npu.mem_get_info() twice in total_allocated_bytes calculation
# which is not atomic and can lead to incorrect memory calculations
mock_torch_mem_get_info.side_effect = [
(7000, 10000), # First call for total_allocated_bytes calculation
(
6000,
10000,
), # Second call for total_allocated_bytes calculation, simulating an allocation
(6000, 10000), # Additional calls for other parts of the method
(6000, 10000),
]
# Create worker mock
with patch.object(NPUWorker, "__init__", lambda x, **kwargs: None):
worker = NPUWorker()
worker.init_npu_memory = (
8500 # Initial memory greater than current free memory
)
worker.model_runner = MagicMock()
worker.cache_config = MagicMock()
worker.cache_config.gpu_memory_utilization = 0.8
# Test determine_available_memory
result = worker.determine_available_memory()
# Verify call count and order
mock_platform_clear_npu_memory.assert_called_once()
self.assertEqual(mock_platform_mem_get_info.call_count, 2)
worker.model_runner.profile_run.assert_called_once()
mock_platform_empty_cache.assert_called_once()
# Verify calculation result with race condition simulation
# Calculation logic:
# total_allocated_bytes = torch_npu.npu.mem_get_info()[1] - torch_npu.npu.mem_get_info()[0]
# = 10000 - 7000 = 3000 (first call)
# = 10000 - 6000 = 4000 (second call, memory changed!)
# This exposes the race condition where memory state changes between calls
# non_torch_allocations = total_allocated_bytes - torch_allocated_bytes
# = 4000 - 3000 = 1000 # Non-torch memory allocation detected
# peak_memory = torch_peak_memory + non_torch_allocations
# = 2000 + 1000 = 3000
# available = total_npu_memory * gpu_memory_utilization - peak_memory
# = 10000 * 0.8 - 3000 = 5000
expected_result = max(0, int(10000 * 0.8 - 3000))
self.assertEqual(result, expected_result)
# Verify log output
mock_logger.info.assert_called_once()
@patch("vllm_ascend.worker.worker_v1.NPUPlatform.clear_npu_memory")
@patch("vllm_ascend.worker.worker_v1.NPUPlatform.empty_cache")
@patch("vllm_ascend.worker.worker_v1.NPUPlatform.mem_get_info")
@patch("torch_npu.npu.memory_stats")
@patch("torch_npu.npu.mem_get_info")
def test_determine_available_memory_with_non_torch_allocations(
self,
mock_torch_mem_get_info,
mock_torch_memory_stats,
mock_platform_mem_get_info,
mock_platform_empty_cache,
mock_platform_clear_npu_memory,
):
"""Test determine_available_memory with significant non-torch memory allocation"""
from vllm_ascend.worker.worker_v1 import NPUWorker
# Setup mock - test case with large non-torch memory allocation
mock_platform_mem_get_info.side_effect = [
(8000, 10000), # 1st call
(7000, 10000), # 2nd call
]
mock_torch_memory_stats.side_effect = [
{
"allocated_bytes.all.peak": 1500
}, # peak memory
{
"allocated_bytes.all.current": 1000
}, # current allocated
]
# Mock setup to expose race condition in total_allocated_bytes calculation
# Setup non-torch allocations > 0 case with memory change simulation
mock_torch_mem_get_info.side_effect = [
(6000, 10000), # First call for total_allocated_bytes calculation
(
5000,
10000,
), # Second call for total_allocated_bytes calculation, simulating allocation
(5000, 10000), # Additional calls for other parts of the method
(5000, 10000),
]
# 创建 worker mock
with patch.object(NPUWorker, "__init__", lambda x, **kwargs: None):
worker = NPUWorker()
worker.init_npu_memory = 8500
worker.model_runner = MagicMock()
worker.cache_config = MagicMock()
worker.cache_config.gpu_memory_utilization = 0.9
# Test determine_available_memory
result = worker.determine_available_memory()
# Verify result: case with large non-torch memory allocation and race condition
# total_allocated_bytes = torch_npu.npu.mem_get_info()[1] - torch_npu.npu.mem_get_info()[0]
# = 10000 - 6000 = 4000 (first call)
# = 10000 - 5000 = 5000 (second call, memory changed!)
# This exposes the race condition where memory allocation occurs between calls
# non_torch_allocations = total_allocated_bytes - torch_allocated_bytes
# = 5000 - 1000 = 4000 # Significant non-torch allocation detected
# peak_memory = torch_peak_memory + non_torch_allocations
# = 1500 + 4000 = 5500
# available = total_npu_memory * gpu_memory_utilization - peak_memory
# = 10000 * 0.9 - 5500 = 3500
expected_result = max(0, int(10000 * 0.9 - 5500))
self.assertEqual(result, expected_result)
@patch("vllm_ascend.worker.worker_v1.NPUPlatform.clear_npu_memory")
@patch("vllm_ascend.worker.worker_v1.NPUPlatform.mem_get_info")
def test_determine_available_memory_memory_profiling_error(
self, mock_platform_mem_get_info, mock_platform_clear_npu_memory):
"""Test determine_available_memory throws exception on memory profiling error"""
from vllm_ascend.worker.worker_v1 import NPUWorker
# Setup mock: initial memory less than current free memory (error case)
mock_platform_mem_get_info.side_effect = [
(8000, 10000), # 1st call
(9000, 10000), # 2nd call: free memory increased instead
]
# Create worker mock
with patch.object(NPUWorker, "__init__", lambda x, **kwargs: None):
worker = NPUWorker()
worker.init_npu_memory = 8500 # Initial memory < current free memory 9000
worker.model_runner = MagicMock()
worker.cache_config = MagicMock()
worker.cache_config.gpu_memory_utilization = 0.8
# Test should throw exception
with self.assertRaises(AssertionError) as cm:
worker.determine_available_memory()
self.assertIn("Error in memory profiling", str(cm.exception))
@patch("vllm_ascend.worker.worker_v1.NPUPlatform.clear_npu_memory")
@patch("vllm_ascend.worker.worker_v1.NPUPlatform.empty_cache")
@patch("vllm_ascend.worker.worker_v1.NPUPlatform.mem_get_info")
@patch("torch_npu.npu.memory_stats")
@patch("torch_npu.npu.mem_get_info")
def test_determine_available_memory_negative_result(
self,
mock_torch_mem_get_info,
mock_torch_memory_stats,
mock_platform_mem_get_info,
mock_platform_empty_cache,
mock_platform_clear_npu_memory,
):
"""Test determine_available_memory returns 0 when result is negative"""
from vllm_ascend.worker.worker_v1 import NPUWorker
# Setup mock: high peak memory causes negative available memory
mock_platform_mem_get_info.side_effect = [
(8000, 10000), # 1st call
(3000, 10000), # 2nd call
]
mock_torch_memory_stats.side_effect = [
{
"allocated_bytes.all.peak": 9000
}, # High peak memory
{
"allocated_bytes.all.current": 7000
},
]
# Mock setup to expose race condition even in negative result scenarios
mock_torch_mem_get_info.side_effect = [
(3000, 10000), # First call for total_allocated_bytes calculation
(
2000,
10000,
), # Second call for total_allocated_bytes calculation, simulating more allocation
(2000, 10000), # Additional calls for other parts of the method
(2000, 10000),
]
# Create worker mock
with patch.object(NPUWorker, "__init__", lambda x, **kwargs: None):
worker = NPUWorker()
worker.init_npu_memory = 8500
worker.model_runner = MagicMock()
worker.cache_config = MagicMock()
worker.cache_config.gpu_memory_utilization = 0.8
# Test determine_available_memory
result = worker.determine_available_memory()
# Verify result is 0 (not negative) even with race condition
# total_allocated_bytes = torch_npu.npu.mem_get_info()[1] - torch_npu.npu.mem_get_info()[0]
# = 10000 - 3000 = 7000 (first call)
# = 10000 - 2000 = 8000 (second call, more memory allocated!)
# non_torch_allocations = total_allocated_bytes - torch_allocated_bytes
# = 8000 - 7000 = 1000 # Additional non-torch allocation detected
# peak_memory = torch_peak_memory + non_torch_allocations
# = 9000 + 1000 = 10000
# available = total_npu_memory * gpu_memory_utilization - peak_memory
# = 10000 * 0.8 - 10000 = -2000, max(0, -2000) = 0
self.assertEqual(result, 0)
def test_execute_model_first_rank(self):
"""Test execute_model method - first rank case"""
from vllm.v1.outputs import ModelRunnerOutput
from vllm_ascend.worker.worker_v1 import NPUWorker
# Create worker mock
with (
patch.object(NPUWorker, "__init__", lambda x, **kwargs: None),
patch("vllm_ascend.worker.worker_v1.get_pp_group") as
mock_get_pp_group,
):
worker = NPUWorker()
worker.model_runner = MagicMock()
worker.vllm_config = MagicMock()
worker.vllm_config.parallel_config = MagicMock()
worker.vllm_config.parallel_config.distributed_executor_backend = "ray"
# Set as first rank
mock_pp_group = MagicMock()
mock_pp_group.is_first_rank = True
mock_pp_group.is_last_rank = True
mock_get_pp_group.return_value = mock_pp_group
# Mock scheduler_output and return result
mock_scheduler_output = MagicMock()
mock_scheduler_output.total_num_scheduled_tokens = 1
# Create a real ModelRunnerOutput instance or mock
mock_model_output = MagicMock(spec=ModelRunnerOutput)
worker.model_runner.execute_model.return_value = mock_model_output
# Test execute_model
result = worker.execute_model(mock_scheduler_output)
# Verify call
worker.model_runner.execute_model.assert_called_once_with(
mock_scheduler_output, None)
self.assertEqual(result, mock_model_output)
@patch("vllm_ascend.worker.worker_v1.get_pp_group")
@patch("vllm_ascend.worker.worker_v1.get_tp_group")
def test_execute_model_middle_rank(self, mock_get_tp_group,
mock_get_pp_group):
"""Test execute_model method - middle rank case"""
from vllm.sequence import IntermediateTensors
from vllm_ascend.worker.worker_v1 import NPUWorker
# Create worker mock
with patch.object(NPUWorker, "__init__", lambda x, **kwargs: None):
worker = NPUWorker()
worker.model_runner = MagicMock()
worker.vllm_config = MagicMock()
worker.vllm_config.parallel_config = MagicMock()
worker.vllm_config.parallel_config.distributed_executor_backend = "ray"
# Set as middle rank (not first, not last)
mock_pp_group = MagicMock()
mock_pp_group.is_first_rank = False
mock_pp_group.is_last_rank = False
mock_get_pp_group.return_value = mock_pp_group
# Setup tensor reception data
mock_pp_group.recv_tensor_dict.return_value = {"tensor": "data"}
# Mock return IntermediateTensors - use real type
mock_intermediate_output = MagicMock(spec=IntermediateTensors)
mock_intermediate_output.tensors = {"output_tensor": "data"}
mock_intermediate_output.kv_connector_output = (
None # Set to None to trigger return None
)
worker.model_runner.execute_model.return_value = mock_intermediate_output
mock_scheduler_output = MagicMock()
mock_scheduler_output.total_num_scheduled_tokens = 1
# Test execute_model
result = worker.execute_model(mock_scheduler_output)
# Verify tensor reception
mock_pp_group.recv_tensor_dict.assert_called_once()
# Verify model execution with intermediate_tensors
# Second parameter should be IntermediateTensors instance
worker.model_runner.execute_model.assert_called_once()
args, kwargs = worker.model_runner.execute_model.call_args
self.assertEqual(args[0], mock_scheduler_output)
self.assertIsInstance(args[1], IntermediateTensors)
# Verify tensor sending
mock_pp_group.send_tensor_dict.assert_called_once()
# Middle rank without kv_transfer_group should return None
self.assertIsNone(result)
def test_execute_model_external_launcher(self):
"""Test execute_model method - external_launcher mode"""
from vllm.v1.outputs import ModelRunnerOutput
from vllm_ascend.worker.worker_v1 import NPUWorker
# Create worker mock
with (
patch.object(NPUWorker, "__init__", lambda x, **kwargs: None),
patch("vllm_ascend.worker.worker_v1.get_pp_group") as
mock_get_pp_group,
):
worker = NPUWorker()
worker.model_runner = MagicMock()
worker.vllm_config = MagicMock()
worker.vllm_config.parallel_config = MagicMock()
worker.vllm_config.parallel_config.distributed_executor_backend = (
"external_launcher")
# Set as non-last rank
mock_pp_group = MagicMock()
mock_pp_group.is_first_rank = True
mock_pp_group.is_last_rank = False
mock_get_pp_group.return_value = mock_pp_group
# Mock return result
mock_scheduler_output = MagicMock()
mock_scheduler_output.total_num_scheduled_tokens = 1
mock_model_output = MagicMock(spec=ModelRunnerOutput)
worker.model_runner.execute_model.return_value = mock_model_output
# Test execute_model
result = worker.execute_model(mock_scheduler_output)
# In external_launcher mode, it doesn't enter middle processing logic, returns result directly
self.assertEqual(result, mock_model_output)
@patch("vllm_ascend.worker.worker_v1.CaMemAllocator")
def test_load_model_with_sleep_mode(self, mock_allocator_class):
"""Test load_model method - with sleep mode enabled"""
from vllm_ascend.worker.worker_v1 import NPUWorker
# Create worker mock
with patch.object(NPUWorker, "__init__", lambda x, **kwargs: None):
worker = NPUWorker()
worker.model_runner = MagicMock()
worker.vllm_config = MagicMock()
worker.vllm_config.model_config = MagicMock()
worker.vllm_config.model_config.enable_sleep_mode = True
# Setup allocator mock
mock_allocator = MagicMock()
mock_allocator.get_current_usage.return_value = 0
mock_context = MagicMock()
mock_allocator.use_memory_pool.return_value = mock_context
mock_allocator_class.get_instance.return_value = mock_allocator
# Test load_model
worker.load_model()
# Verify calls
mock_allocator_class.get_instance.assert_called_once()
mock_allocator.get_current_usage.assert_called_once()
mock_allocator.use_memory_pool.assert_called_once_with(
tag="weights")
worker.model_runner.load_model.assert_called_once()
def test_load_model_without_sleep_mode(self):
"""Test load_model method - without sleep mode enabled"""
from vllm_ascend.worker.worker_v1 import NPUWorker
# Create worker mock
with patch.object(NPUWorker, "__init__", lambda x, **kwargs: None):
worker = NPUWorker()
worker.model_runner = MagicMock()
worker.vllm_config = MagicMock()
worker.vllm_config.model_config = MagicMock()
worker.vllm_config.model_config.enable_sleep_mode = False
# Test load_model
worker.load_model()
# Verify calls
worker.model_runner.load_model.assert_called_once()
@patch("vllm_ascend.worker.worker_v1.CaMemAllocator")
def test_load_model_sleep_mode_assertion_error(self, mock_allocator_class):
"""Test load_model method - assertion error in sleep mode"""
from vllm_ascend.worker.worker_v1 import NPUWorker
# Create worker mock
with patch.object(NPUWorker, "__init__", lambda x, **kwargs: None):
worker = NPUWorker()
worker.model_runner = MagicMock()
worker.vllm_config = MagicMock()
worker.vllm_config.model_config = MagicMock()
worker.vllm_config.model_config.enable_sleep_mode = True
# Setup allocator mock - current usage is not 0
mock_allocator = MagicMock()
mock_allocator.get_current_usage.return_value = 100 # Non-zero value
mock_allocator_class.get_instance.return_value = mock_allocator
# Test should throw assertion error
with self.assertRaises(AssertionError) as cm:
worker.load_model()
self.assertIn("Sleep mode can only be", str(cm.exception))
@patch("vllm_ascend.worker.worker_v1.NPUPlatform.seed_everything")
@patch("vllm_ascend.worker.worker_v1.logger")
@patch("vllm_ascend.worker.worker_v1.NPUWorker._warm_up_atb")
def test_compile_or_warm_up_model_with_eager_mode(self, mock_warm_up_atb,
mock_logger,
mock_seed_everything):
"""Test compile_or_warm_up_model method - eager mode"""
from vllm_ascend.worker.worker_v1 import NPUWorker
# Create worker mock
with patch.object(NPUWorker, "__init__", lambda x, **kwargs: None):
worker = NPUWorker()
worker.model_runner = MagicMock()
worker.vllm_config = MagicMock()
worker.model_config = MagicMock()
worker.model_config.enforce_eager = True
worker.model_config.seed = 12345
# Setup compilation config
worker.vllm_config.compilation_config = MagicMock()
worker.vllm_config.compilation_config.compile_sizes = [1, 4, 8, 16]
worker.vllm_config.compilation_config.cudagraph_capture_sizes = [
4, 8
]
# Test compile_or_warm_up_model
worker.compile_or_warm_up_model()
# Verify _dummy_run call count and order (by size descending)
expected_calls = [
unittest.mock.call(16),
unittest.mock.call(8),
unittest.mock.call(4),
unittest.mock.call(1),
]
worker.model_runner._dummy_run.assert_has_calls(expected_calls)
# Should not call capture_model in eager mode
worker.model_runner.capture_model.assert_not_called()
# Verify log output
self.assertEqual(mock_logger.info.call_count, 4)
# Verify seed setting
mock_seed_everything.assert_called_once_with(12345)
# Verify atb warm up
mock_warm_up_atb.assert_called_once()
@patch("vllm_ascend.worker.worker_v1.NPUPlatform.seed_everything")
@patch("vllm_ascend.worker.worker_v1.logger")
@patch("vllm_ascend.worker.worker_v1.NPUWorker._warm_up_atb")
def test_compile_or_warm_up_model_with_graph_capture(
self, mock_warm_up_atb, mock_logger, mock_seed_everything):
"""Test compile_or_warm_up_model method - with graph capture enabled"""
from vllm_ascend.worker.worker_v1 import NPUWorker
# Create worker mock
with patch.object(NPUWorker, "__init__", lambda x, **kwargs: None):
worker = NPUWorker()
worker.model_runner = MagicMock()
worker.vllm_config = MagicMock()
worker.model_config = MagicMock()
worker.model_config.enforce_eager = False # Enable graph capture
worker.model_config.seed = 67890
# Setup compilation config
worker.vllm_config.compilation_config = MagicMock()
worker.vllm_config.compilation_config.compile_sizes = [1, 4, 8, 16]
worker.vllm_config.compilation_config.cudagraph_capture_sizes = [
4, 8
]
# Test compile_or_warm_up_model
worker.compile_or_warm_up_model()
# Verify only call _dummy_run for sizes not in cudagraph_capture_sizes
expected_calls = [unittest.mock.call(16), unittest.mock.call(1)]
worker.model_runner._dummy_run.assert_has_calls(expected_calls)
# Should call capture_model in non-eager mode
worker.model_runner.capture_model.assert_called_once()
# Verify seed setting
mock_seed_everything.assert_called_once_with(67890)
# Verify atb warm up
mock_warm_up_atb.assert_called_once()
@patch("vllm_ascend.worker.worker_v1.CaMemAllocator")
def test_initialize_from_config_with_sleep_mode(self,
mock_allocator_class):
"""Test initialize_from_config method - with sleep mode enabled"""
from vllm_ascend.worker.worker_v1 import NPUWorker
# Create worker mock
with patch.object(NPUWorker, "__init__", lambda x, **kwargs: None):
worker = NPUWorker()
worker.model_runner = MagicMock()
worker.vllm_config = MagicMock()
worker.vllm_config.model_config = MagicMock()
worker.vllm_config.model_config.enable_sleep_mode = True
# Setup allocator mock
mock_allocator = MagicMock()
mock_context = MagicMock()
mock_allocator.use_memory_pool.return_value = mock_context
mock_allocator_class.get_instance.return_value = mock_allocator
# Create mock kv_cache_config
mock_kv_cache_config = MagicMock()
# Test initialize_from_config
worker.initialize_from_config(mock_kv_cache_config)
# Verify calls
mock_allocator_class.get_instance.assert_called_once()
mock_allocator.use_memory_pool.assert_called_once_with(
tag="kv_cache")
worker.model_runner.initialize_kv_cache.assert_called_once_with(
mock_kv_cache_config)
def test_initialize_from_config_without_sleep_mode(self):
"""Test initialize_from_config method - without sleep mode enabled"""
from vllm_ascend.worker.worker_v1 import NPUWorker
# Create worker mock
with patch.object(NPUWorker, "__init__", lambda x, **kwargs: None):
worker = NPUWorker()
worker.model_runner = MagicMock()
worker.vllm_config = MagicMock()
worker.vllm_config.model_config = MagicMock()
worker.vllm_config.model_config.enable_sleep_mode = False
# Create mock kv_cache_config
mock_kv_cache_config = MagicMock()
# Test initialize_from_config
worker.initialize_from_config(mock_kv_cache_config)
# Verify calls
worker.model_runner.initialize_kv_cache.assert_called_once_with(
mock_kv_cache_config)
@patch("vllm_ascend.worker.worker_v1.get_pp_group")
@patch("vllm_ascend.worker.worker_v1.get_tp_group")
@patch("vllm_ascend.worker.worker_v1.EMPTY_MODEL_RUNNER_OUTPUT")
def test_execute_model_kv_connector_not_finished(self, mock_empty_output,
mock_get_tp_group,
mock_get_pp_group):
"""Test execute_model method - kv_connector_output not finished sending/recving case"""
from vllm.sequence import IntermediateTensors
from vllm_ascend.worker.worker_v1 import NPUWorker
# Create worker mock
with patch.object(NPUWorker, "__init__", lambda x, **kwargs: None):
worker = NPUWorker()
worker.model_runner = MagicMock()
worker.vllm_config = MagicMock()
worker.vllm_config.parallel_config = MagicMock()
worker.vllm_config.parallel_config.distributed_executor_backend = "ray"
# Set as middle rank (not first, not last)
mock_pp_group = MagicMock()
mock_pp_group.is_first_rank = False
mock_pp_group.is_last_rank = False
mock_get_pp_group.return_value = mock_pp_group
# Setup tensor reception data
mock_pp_group.recv_tensor_dict.return_value = {"tensor": "data"}
# Create mock kv_connector_output - both finished_sending and finished_recving are False
mock_kv_connector_output = MagicMock()
mock_kv_connector_output.finished_sending = False
mock_kv_connector_output.finished_recving = False
# Mock return IntermediateTensors with kv_connector_output
mock_intermediate_output = MagicMock(spec=IntermediateTensors)
mock_intermediate_output.tensors = {"output_tensor": "data"}
mock_intermediate_output.kv_connector_output = mock_kv_connector_output
worker.model_runner.execute_model.return_value = mock_intermediate_output
mock_scheduler_output = MagicMock()
mock_scheduler_output.total_num_scheduled_tokens = 1
# Test execute_model
result = worker.execute_model(mock_scheduler_output)
# Verify tensor reception and sending
mock_pp_group.recv_tensor_dict.assert_called_once()
mock_pp_group.send_tensor_dict.assert_called_once()
# When both finished_sending and finished_recving are False, should return EMPTY_MODEL_RUNNER_OUTPUT directly
self.assertEqual(result, mock_empty_output)