From 1a7f845696b079b3a549594801d656fe808abd26 Mon Sep 17 00:00:00 2001 From: realliujiaxu Date: Thu, 5 Mar 2026 16:18:34 +0800 Subject: [PATCH] [Feat][Worker] NPUWorker Profiler profile_prefix full adaptation (RFC #6954) (#6968) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What this PR does / why we need it? Implements [RFC #6954](https://github.com/vllm-project/vllm-ascend/issues/6954): NPUWorker Profiler profile_prefix full adaptation for API parity with upstream vLLM. ### Changes - **Lazy profiler init**: Defer profiler creation until first `profile(is_start=True)` call - **profile_prefix param**: Add `profile_prefix` to `profile()`; compute `trace_name` from prefix + `get_worker_rank_suffix()` - **Refactor `_init_profiler` → `_create_profiler(trace_name)`**: Pass `worker_name` to `tensorboard_trace_handler` for unique trace files per worker - Unique trace files per worker; no collision in multi-worker setups ### Testing - Unit tests updated/added in `tests/ut/worker/test_worker_v1.py` - `pytest tests/ut/worker/test_worker_v1.py::TestNPUWorker` passed ## Does this PR introduce _any_ user-facing change? Yes. Trace file naming may differ (more descriptive with worker rank suffix). `profile(is_start=True, profile_prefix="warmup")` now supported. ## How was this patch tested? - Unit tests:`pytest tests/ut/worker/test_worker_v1.py::TestNPUWorker` - Manual: vLLM serve with profiler config, start/stop profile, verified trace files - vLLM version: v0.16.0 - vLLM main: https://github.com/vllm-project/vllm/commit/15d76f74e2fdb12a95ea00f0ca283acf6219a2b7 --------- Signed-off-by: realliujiaxu --- tests/ut/worker/test_worker_v1.py | 236 +++++++++++++++++++----------- vllm_ascend/worker/worker.py | 106 +++++++++----- 2 files changed, 219 insertions(+), 123 deletions(-) diff --git a/tests/ut/worker/test_worker_v1.py b/tests/ut/worker/test_worker_v1.py index fb6d66c5..ffd5083a 100644 --- a/tests/ut/worker/test_worker_v1.py +++ b/tests/ut/worker/test_worker_v1.py @@ -53,10 +53,10 @@ class TestNPUWorker(TestBase): @patch("vllm_ascend.worker.worker.init_ascend_config") @patch("vllm_ascend.worker.worker.check_ascend_device_type") @patch(init_cached_hf_modules_path, create=True) - @patch("vllm_ascend.worker.worker.NPUWorker._init_profiler") + @patch("vllm_ascend.worker.worker.NPUWorker._create_profiler") def test_init_npu_worker_normal_case( self, - mock_init_profiler, + mock_create_profiler, mock_init_cached_hf_modules, mock_check_ascend_device_type, mock_init_ascend_config, @@ -94,7 +94,8 @@ class TestNPUWorker(TestBase): # Verify cache_dtype setting self.assertEqual(worker.cache_dtype, torch.float16) - mock_init_profiler.assert_called_once() + # Profiler is lazily initialized - not created during __init__ (RFC #6954) + mock_create_profiler.assert_not_called() # Verify init_cached_hf_modules is not called (trust_remote_code=False) mock_init_cached_hf_modules.assert_not_called() @@ -107,10 +108,10 @@ class TestNPUWorker(TestBase): @patch("vllm_ascend.worker.worker.init_ascend_config") @patch("vllm_ascend.worker.worker.check_ascend_device_type") @patch(init_cached_hf_modules_path, create=True) - @patch("vllm_ascend.worker.worker.NPUWorker._init_profiler") + @patch("vllm_ascend.worker.worker.NPUWorker._create_profiler") def test_init_npu_worker_with_trust_remote_code( self, - mock_init_profiler, + mock_create_profiler, mock_init_cached_hf_modules, mock_check_ascend_device_type, mock_init_ascend_config, @@ -150,10 +151,10 @@ class TestNPUWorker(TestBase): @patch("vllm_ascend.worker.worker.init_ascend_config") @patch("vllm_ascend.worker.worker.check_ascend_device_type") @patch(init_cached_hf_modules_path, create=True) - @patch("vllm_ascend.worker.worker.NPUWorker._init_profiler") + @patch("vllm_ascend.worker.worker.NPUWorker._create_profiler") def test_init_npu_worker_with_custom_cache_dtype( self, - mock_init_profiler, + mock_create_profiler, mock_init_cached_hf_modules, mock_check_ascend_device_type, mock_init_ascend_config, @@ -278,17 +279,20 @@ class TestNPUWorker(TestBase): """Test profile method start and stop""" from vllm_ascend.worker.worker import NPUWorker - # Create worker mock + profiler_config = ProfilerConfig( + profiler="torch", + torch_profiler_dir="/path/to/traces", + ) with patch.object(NPUWorker, "__init__", lambda x, **kwargs: None): worker = NPUWorker() + worker.profiler_config = profiler_config + worker.rank = 0 mock_profiler = MagicMock() worker.profiler = mock_profiler - # Test start profiler worker.profile(is_start=True) mock_profiler.start.assert_called_once() - # Test stop profiler worker.profile(is_start=False) mock_profiler.stop.assert_called_once() @@ -296,42 +300,141 @@ class TestNPUWorker(TestBase): """Test profile method raises exception when profiler is not available""" from vllm_ascend.worker.worker import NPUWorker - # Create worker mock + # Create worker mock - profiler_config indicates profiling disabled with patch.object(NPUWorker, "__init__", lambda x, **kwargs: None): worker = NPUWorker() worker.profiler = None + worker.profiler_config = ProfilerConfig(profiler=None, torch_profiler_dir="") # Test should raise exception with self.assertRaises(RuntimeError) as cm: worker.profile() - self.assertIn("Profiler is not enabled", str(cm.exception)) + self.assertIn("Profiling is not enabled", str(cm.exception)) + + def test_profile_with_prefix_uses_trace_name(self): + """[RFC #6954] profile() accepts profile_prefix and passes trace_name to _create_profiler""" + from vllm_ascend.worker.worker import NPUWorker + + profiler_config = ProfilerConfig( + profiler="torch", + torch_profiler_dir="/path/to/traces", + ) + vllm_config_mock = MagicMock() + vllm_config_mock.profiler_config = profiler_config + + with patch.object(NPUWorker, "__init__", lambda x, **kwargs: None): + worker = NPUWorker() + worker.profiler_config = profiler_config + worker.profiler = None + worker.rank = 0 + + with patch("vllm.distributed.utils.get_worker_rank_suffix", return_value="dp0_pp0_tp0_dcp0_ep0_rank0"): + with patch.object(NPUWorker, "_create_profiler", return_value=MagicMock()) as mock_create: + worker.profile(is_start=True, profile_prefix="warmup") + + mock_create.assert_called_once_with("warmup_dp0_pp0_tp0_dcp0_ep0_rank0") + + def test_profile_lazy_init(self): + """[RFC #6954] Profiler is lazily created on first profile(is_start=True) call""" + from vllm_ascend.worker.worker import NPUWorker + + profiler_config = ProfilerConfig( + profiler="torch", + torch_profiler_dir="/path/to/traces", + ) + vllm_config_mock = MagicMock() + vllm_config_mock.profiler_config = profiler_config + + with patch.object(NPUWorker, "_create_profiler", return_value=MagicMock()) as mock_create: + with patch.object(NPUWorker, "__init__", lambda x, **kwargs: None): + worker = NPUWorker() + worker.profiler_config = profiler_config + worker.profiler = None + worker.rank = 0 + + self.assertIsNone(worker.profiler) + mock_create.assert_not_called() + + with patch("vllm.distributed.utils.get_worker_rank_suffix", return_value="dp0_pp0_tp0_dcp0_ep0_rank0"): + worker.profile(is_start=True) + + mock_create.assert_called_once() + self.assertIsNotNone(worker.profiler) + + def test_profile_restart_reuses_existing_profiler(self): + """[RFC #6954] Restarting profile (stop then start) reuses existing profiler, does not call _create_profiler again""" + from vllm_ascend.worker.worker import NPUWorker + + profiler_config = ProfilerConfig( + profiler="torch", + torch_profiler_dir="/path/to/traces", + ) + mock_profiler = MagicMock() + + with patch.object(NPUWorker, "__init__", lambda x, **kwargs: None): + worker = NPUWorker() + worker.profiler_config = profiler_config + worker.profiler = None + worker.rank = 0 + + with patch("vllm.distributed.utils.get_worker_rank_suffix", return_value="dp0_pp0_tp0_dcp0_ep0_rank0"): + with patch.object(NPUWorker, "_create_profiler", return_value=mock_profiler) as mock_create: + worker.profile(is_start=True, profile_prefix="session1") + mock_create.assert_called_once_with("session1_dp0_pp0_tp0_dcp0_ep0_rank0") + + worker.profile(is_start=False) + worker.profile(is_start=True) # Restart without new prefix + # Should NOT create new profiler, just restart existing + mock_create.assert_called_once() + + def test_trace_handler_uses_worker_name(self): + """[RFC #6954] _create_profiler passes worker_name to tensorboard_trace_handler""" + from vllm_ascend.worker.worker import NPUWorker + + profiler_config = ProfilerConfig( + profiler="torch", + torch_profiler_dir="/path/to/traces", + ) + vllm_config_mock = MagicMock() + vllm_config_mock.profiler_config = profiler_config + + with patch("vllm_ascend.worker.worker.envs_ascend") as mock_envs: + mock_envs.MSMONITOR_USE_DAEMON = 0 + with patch("torch_npu.profiler.tensorboard_trace_handler") as mock_handler: + with patch.object(NPUWorker, "__init__", lambda x, **kwargs: None): + worker = NPUWorker() + worker.profiler_config = profiler_config + worker.vllm_config = vllm_config_mock + + worker._create_profiler("warmup_dp0_pp0_tp0_dcp0_ep0_rank0") + + mock_handler.assert_called_once() + call_kwargs = mock_handler.call_args[1] if mock_handler.call_args[1] else {} + self.assertEqual(call_kwargs.get("worker_name"), "warmup_dp0_pp0_tp0_dcp0_ep0_rank0") @patch("vllm_ascend.worker.worker.envs_ascend") def test_profile_and_msmonitor_both_enabled_raises_error( self, mock_envs_ascend): - """Test profile method raises exception when both profiler and msmonitor are enabled""" + """Test _create_profiler raises when both profiler and msmonitor are enabled""" from vllm_ascend.worker.worker import NPUWorker mock_envs_ascend.MSMONITOR_USE_DAEMON = 1 - # Create profiler config object profiler_config = ProfilerConfig( profiler="torch", torch_profiler_dir="/path/to/traces" ) - vllm_config_mock = MagicMock() vllm_config_mock.profiler_config = profiler_config - # Create worker mock with patch.object(NPUWorker, "__init__", lambda x, **kwargs: None): worker = NPUWorker() + worker.profiler_config = profiler_config worker.vllm_config = vllm_config_mock - # Test should raise exception with self.assertRaises(RuntimeError) as cm: - _ = worker._init_profiler() + _ = worker._create_profiler("test_trace") self.assertIn( "MSMONITOR_USE_DAEMON and torch profiler cannot be both enabled at the same time.", @@ -417,7 +520,7 @@ class TestNPUWorker(TestBase): mock_model_runner._dummy_run.assert_called_once_with( num_tokens=mock_decode_token_per_req, uniform_decode=True) - @patch("vllm_ascend.worker.worker.logger") + @patch("vllm_ascend.worker.worker.envs_ascend") @patch("torch_npu.profiler._ExperimentalConfig") @patch("torch_npu.profiler.profile") @patch("torch_npu.profiler.tensorboard_trace_handler") @@ -425,7 +528,7 @@ class TestNPUWorker(TestBase): @patch("torch_npu.profiler.ProfilerLevel") @patch("torch_npu.profiler.AiCMetrics") @patch("torch_npu.profiler.ProfilerActivity") - def test_init_profiler_enabled( + def test_create_profiler_enabled( self, mock_profiler_activity, mock_aic_metrics, @@ -434,30 +537,28 @@ class TestNPUWorker(TestBase): mock_trace_handler, mock_profile, mock_experimental_config, - mock_logger, + mock_envs_ascend, ): - """Test _init_profiler method - profiler enabled case with stack and memory profiling enabled""" + """Test _create_profiler - profiler enabled with worker_name for trace naming (RFC #6954)""" from vllm_ascend.worker.worker import NPUWorker - # Create profiler config object + mock_envs_ascend.MSMONITOR_USE_DAEMON = 0 + profiler_config = ProfilerConfig( profiler="torch", torch_profiler_dir="/path/to/traces", torch_profiler_with_stack=True, torch_profiler_with_memory=True ) - vllm_config_mock = MagicMock() vllm_config_mock.profiler_config = profiler_config - # Set enum mocks mock_export_type.Text = "Text" mock_profiler_level.Level1 = "Level1" mock_aic_metrics.AiCoreNone = "AiCoreNone" mock_profiler_activity.CPU = "CPU" mock_profiler_activity.NPU = "NPU" - # Set mock return values mock_experimental_config_instance = MagicMock() mock_experimental_config.return_value = mock_experimental_config_instance mock_trace_handler_instance = MagicMock() @@ -465,25 +566,16 @@ class TestNPUWorker(TestBase): mock_profiler_instance = MagicMock() mock_profile.return_value = mock_profiler_instance - # Create worker mock with patch.object(NPUWorker, "__init__", lambda x, **kwargs: None): worker = NPUWorker() + worker.profiler_config = profiler_config worker.vllm_config = vllm_config_mock - # Test _init_profiler - result = worker._init_profiler() + result = worker._create_profiler("warmup_dp0_pp0_tp0_dcp0_ep0_rank0") - # Verify log output - mock_logger.info.assert_called_once_with( - "Profiling enabled. Traces will be saved to: %s", - "/path/to/traces") - - # Verify ExperimentalConfig creation mock_experimental_config.assert_called_once() config_call = mock_experimental_config.call_args config_kwargs = config_call.kwargs - - # Verify configuration parameters expected_config = { "export_type": "Text", "profiler_level": "Level1", @@ -491,82 +583,60 @@ class TestNPUWorker(TestBase): "aic_metrics": "AiCoreNone", "l2_cache": False, "op_attr": False, - "data_simplification": False, + "data_simplification": True, "record_op_args": False, "gc_detect_threshold": None, } for key, expected_value in expected_config.items(): self.assertEqual(config_kwargs[key], expected_value) - # Verify trace handler creation - mock_trace_handler.assert_called_once_with("/path/to/traces") + # Verify trace handler called with worker_name (RFC #6954) + mock_trace_handler.assert_called_once_with( + "/path/to/traces", + worker_name="warmup_dp0_pp0_tp0_dcp0_ep0_rank0", + ) - # Verify profiler creation mock_profile.assert_called_once() - profile_call = mock_profile.call_args - profile_kwargs = profile_call.kwargs - - # Verify profiler parameters + profile_kwargs = mock_profile.call_args.kwargs expected_activities = ["CPU", "NPU"] self.assertEqual(profile_kwargs["activities"], expected_activities) - self.assertTrue(profile_kwargs["with_stack"]) self.assertTrue(profile_kwargs["profile_memory"]) - self.assertFalse(profile_kwargs["with_modules"]) - self.assertEqual(profile_kwargs["experimental_config"], - mock_experimental_config_instance) - self.assertEqual(profile_kwargs["on_trace_ready"], - mock_trace_handler_instance) - - # Verify return value + self.assertEqual(profile_kwargs["on_trace_ready"], mock_trace_handler_instance) self.assertEqual(result, mock_profiler_instance) - def test_init_profiler_disabled(self): - """Test _init_profiler method - profiler disabled case""" + def test_create_profiler_disabled(self): + """Test _create_profiler raises when profiler disabled""" from vllm_ascend.worker.worker import NPUWorker - # Create profiler config object with profiler disabled profiler_config = ProfilerConfig( profiler=None, torch_profiler_dir="" ) - vllm_config_mock = MagicMock() - vllm_config_mock.profiler_config = profiler_config - - # Create worker mock with patch.object(NPUWorker, "__init__", lambda x, **kwargs: None): worker = NPUWorker() - worker.vllm_config = vllm_config_mock + worker.profiler_config = profiler_config - # Test _init_profiler - result = worker._init_profiler() + with self.assertRaises(RuntimeError) as cm: + worker._create_profiler("test_trace") + self.assertIn("Unrecognized profiler: None", str(cm.exception)) - # Verify returns None - self.assertIsNone(result) - - def test_init_profiler_empty_dir(self): - """Test _init_profiler method - empty directory string case""" + def test_create_profiler_empty_dir(self): + """Test _create_profiler raises when torch_profiler_dir is empty/falsy""" from vllm_ascend.worker.worker import NPUWorker - # Create profiler config object with empty dir - profiler_config = ProfilerConfig( - profiler="torch", - torch_profiler_dir="" - ) + # Use MagicMock to bypass ProfilerConfig validation (empty dir not allowed) + profiler_config = MagicMock() + profiler_config.profiler = "torch" + profiler_config.torch_profiler_dir = "" - vllm_config_mock = MagicMock() - vllm_config_mock.profiler_config = profiler_config - - # Create worker mock with patch.object(NPUWorker, "__init__", lambda x, **kwargs: None): worker = NPUWorker() - worker.vllm_config = vllm_config_mock + worker.profiler_config = profiler_config - # Test _init_profiler - result = worker._init_profiler() - - # Verify returns None (empty string is considered false) - self.assertIsNone(result) + with self.assertRaises(RuntimeError) as cm: + worker._create_profiler("test_trace") + self.assertIn("torch_profiler_dir cannot be empty", str(cm.exception)) @patch("torch.npu.reset_peak_memory_stats") @patch("torch.npu.empty_cache") diff --git a/vllm_ascend/worker/worker.py b/vllm_ascend/worker/worker.py index 76bee45c..d746b1cd 100644 --- a/vllm_ascend/worker/worker.py +++ b/vllm_ascend/worker/worker.py @@ -125,7 +125,9 @@ class NPUWorker(WorkerBase): else: self.cache_dtype = STR_DTYPE_TO_TORCH_DTYPE[self.cache_config.cache_dtype] - self.profiler = self._init_profiler() + # Profiler is lazily initialized on first profile(is_start=True) call (RFC #6954) + self.profiler_config = vllm_config.profiler_config + self.profiler = None if vllm_config.model_config and vllm_config.model_config.enable_sleep_mode: # Buffers saved before sleep self._sleep_saved_buffers: dict[str, torch.Tensor] = {} @@ -511,12 +513,34 @@ class NPUWorker(WorkerBase): with context: self.model_runner.initialize_kv_cache(kv_cache_config) - def profile(self, is_start: bool = True): - if self.profiler is None: - raise RuntimeError("Profiler is not enabled.") + def profile(self, is_start: bool = True, profile_prefix: str | None = None): + # Check if profiling is enabled (RFC #6954 - align with upstream vLLM) + if self.profiler_config is None or self.profiler_config.profiler is None: + raise RuntimeError( + "Profiling is not enabled. Please set --profiler-config to enable " + "profiling. Example: " + "'--profiler-config.profiler=torch --profiler-config.torch_profiler_dir" + "=YOUR_DIR_PATH_TO_DUMP_TRACE'" + ) + if is_start: - self.profiler.start() + from vllm.distributed.utils import get_worker_rank_suffix + + rank_suffix = get_worker_rank_suffix(global_rank=self.rank) + trace_name = f"{profile_prefix}_{rank_suffix}" if profile_prefix else rank_suffix + + if self.profiler is None: + self.profiler = self._create_profiler(trace_name) + logger.debug("Starting torch profiler with trace name: %s", trace_name) + self.profiler.start() # type: ignore[attr-defined] + else: + # Profiler already initialized. Restart profiling but keep + # the original trace name from the first initialization. + self.profiler.start() else: + if self.profiler is None: + logger.warning("Profiler was not started, nothing to stop.") + return self.profiler.stop() def add_lora(self, lora_request: LoRARequest) -> bool: @@ -553,43 +577,45 @@ class NPUWorker(WorkerBase): ensure_kv_transfer_initialized(self.vllm_config) ensure_ec_transfer_initialized(self.vllm_config) - def _init_profiler(self): - # Torch profiler. Enabled through profiler_config: - # --profiler-config.profiler=torch --profiler-config.torch_profiler_dir=/path/to/save/trace - profiler_config = self.vllm_config.profiler_config - if profiler_config.profiler == "torch" and profiler_config.torch_profiler_dir: - if envs_ascend.MSMONITOR_USE_DAEMON: - raise RuntimeError("MSMONITOR_USE_DAEMON and torch profiler cannot be both enabled at the same time.") - torch_profiler_trace_dir = profiler_config.torch_profiler_dir - logger.info("Profiling enabled. Traces will be saved to: %s", torch_profiler_trace_dir) + def _create_profiler(self, trace_name: str): + """Create torch_npu profiler with trace naming for unique files per worker (RFC #6954).""" + profiler_config = self.profiler_config - experimental_config = torch_npu.profiler._ExperimentalConfig( - export_type=torch_npu.profiler.ExportType.Text, - profiler_level=torch_npu.profiler.ProfilerLevel.Level1, - msprof_tx=False, - aic_metrics=torch_npu.profiler.AiCMetrics.AiCoreNone, - l2_cache=False, - op_attr=False, - data_simplification=True, - record_op_args=False, - gc_detect_threshold=None, - ) + if profiler_config.profiler != "torch": + raise RuntimeError(f"Unrecognized profiler: {profiler_config.profiler}") + if not profiler_config.torch_profiler_dir: + raise RuntimeError("torch_profiler_dir cannot be empty.") + if envs_ascend.MSMONITOR_USE_DAEMON: + raise RuntimeError("MSMONITOR_USE_DAEMON and torch profiler cannot be both enabled at the same time.") - return torch_npu.profiler.profile( - activities=[ - torch_npu.profiler.ProfilerActivity.CPU, - torch_npu.profiler.ProfilerActivity.NPU, - ], - with_stack=False, - profile_memory=profiler_config.torch_profiler_with_memory, - # NOTE: torch_npu.profiler.with_modules is equivalent to torch.profiler.with_stack. - # The with_stack option in torch_npu.profiler introduces significant time overhead. - with_modules=profiler_config.torch_profiler_with_stack, - experimental_config=experimental_config, - on_trace_ready=torch_npu.profiler.tensorboard_trace_handler(torch_profiler_trace_dir), - ) - else: - return None + experimental_config = torch_npu.profiler._ExperimentalConfig( + export_type=torch_npu.profiler.ExportType.Text, + profiler_level=torch_npu.profiler.ProfilerLevel.Level1, + msprof_tx=False, + aic_metrics=torch_npu.profiler.AiCMetrics.AiCoreNone, + l2_cache=False, + op_attr=False, + data_simplification=True, + record_op_args=False, + gc_detect_threshold=None, + ) + + return torch_npu.profiler.profile( + activities=[ + torch_npu.profiler.ProfilerActivity.CPU, + torch_npu.profiler.ProfilerActivity.NPU, + ], + with_stack=False, + profile_memory=profiler_config.torch_profiler_with_memory, + # NOTE: torch_npu.profiler.with_modules is equivalent to torch.profiler.with_stack. + # The with_stack option in torch_npu.profiler introduces significant time overhead. + with_modules=profiler_config.torch_profiler_with_stack, + experimental_config=experimental_config, + on_trace_ready=torch_npu.profiler.tensorboard_trace_handler( + profiler_config.torch_profiler_dir, + worker_name=trace_name, + ), + ) def get_supported_pooling_tasks(self): return self.model_runner.get_supported_pooling_tasks()