## What this PR does / why we need it?
Implements [RFC
#6954](https://github.com/vllm-project/vllm-ascend/issues/6954):
NPUWorker Profiler profile_prefix full adaptation for API parity with
upstream vLLM.
### Changes
- **Lazy profiler init**: Defer profiler creation until first
`profile(is_start=True)` call
- **profile_prefix param**: Add `profile_prefix` to `profile()`; compute
`trace_name` from prefix + `get_worker_rank_suffix()`
- **Refactor `_init_profiler` → `_create_profiler(trace_name)`**: Pass
`worker_name` to `tensorboard_trace_handler` for unique trace files per
worker
- Unique trace files per worker; no collision in multi-worker setups
### Testing
- Unit tests updated/added in `tests/ut/worker/test_worker_v1.py`
- `pytest tests/ut/worker/test_worker_v1.py::TestNPUWorker` passed
## Does this PR introduce _any_ user-facing change?
Yes. Trace file naming may differ (more descriptive with worker rank
suffix). `profile(is_start=True, profile_prefix="warmup")` now
supported.
## How was this patch tested?
- Unit tests:`pytest tests/ut/worker/test_worker_v1.py::TestNPUWorker`
- Manual: vLLM serve with profiler config, start/stop profile, verified
trace files
- vLLM version: v0.16.0
- vLLM main:
15d76f74e2
---------
Signed-off-by: realliujiaxu <realliujiaxu@163.com>
This commit is contained in:
@@ -53,10 +53,10 @@ class TestNPUWorker(TestBase):
|
||||
@patch("vllm_ascend.worker.worker.init_ascend_config")
|
||||
@patch("vllm_ascend.worker.worker.check_ascend_device_type")
|
||||
@patch(init_cached_hf_modules_path, create=True)
|
||||
@patch("vllm_ascend.worker.worker.NPUWorker._init_profiler")
|
||||
@patch("vllm_ascend.worker.worker.NPUWorker._create_profiler")
|
||||
def test_init_npu_worker_normal_case(
|
||||
self,
|
||||
mock_init_profiler,
|
||||
mock_create_profiler,
|
||||
mock_init_cached_hf_modules,
|
||||
mock_check_ascend_device_type,
|
||||
mock_init_ascend_config,
|
||||
@@ -94,7 +94,8 @@ class TestNPUWorker(TestBase):
|
||||
|
||||
# Verify cache_dtype setting
|
||||
self.assertEqual(worker.cache_dtype, torch.float16)
|
||||
mock_init_profiler.assert_called_once()
|
||||
# Profiler is lazily initialized - not created during __init__ (RFC #6954)
|
||||
mock_create_profiler.assert_not_called()
|
||||
|
||||
# Verify init_cached_hf_modules is not called (trust_remote_code=False)
|
||||
mock_init_cached_hf_modules.assert_not_called()
|
||||
@@ -107,10 +108,10 @@ class TestNPUWorker(TestBase):
|
||||
@patch("vllm_ascend.worker.worker.init_ascend_config")
|
||||
@patch("vllm_ascend.worker.worker.check_ascend_device_type")
|
||||
@patch(init_cached_hf_modules_path, create=True)
|
||||
@patch("vllm_ascend.worker.worker.NPUWorker._init_profiler")
|
||||
@patch("vllm_ascend.worker.worker.NPUWorker._create_profiler")
|
||||
def test_init_npu_worker_with_trust_remote_code(
|
||||
self,
|
||||
mock_init_profiler,
|
||||
mock_create_profiler,
|
||||
mock_init_cached_hf_modules,
|
||||
mock_check_ascend_device_type,
|
||||
mock_init_ascend_config,
|
||||
@@ -150,10 +151,10 @@ class TestNPUWorker(TestBase):
|
||||
@patch("vllm_ascend.worker.worker.init_ascend_config")
|
||||
@patch("vllm_ascend.worker.worker.check_ascend_device_type")
|
||||
@patch(init_cached_hf_modules_path, create=True)
|
||||
@patch("vllm_ascend.worker.worker.NPUWorker._init_profiler")
|
||||
@patch("vllm_ascend.worker.worker.NPUWorker._create_profiler")
|
||||
def test_init_npu_worker_with_custom_cache_dtype(
|
||||
self,
|
||||
mock_init_profiler,
|
||||
mock_create_profiler,
|
||||
mock_init_cached_hf_modules,
|
||||
mock_check_ascend_device_type,
|
||||
mock_init_ascend_config,
|
||||
@@ -278,17 +279,20 @@ class TestNPUWorker(TestBase):
|
||||
"""Test profile method start and stop"""
|
||||
from vllm_ascend.worker.worker import NPUWorker
|
||||
|
||||
# Create worker mock
|
||||
profiler_config = ProfilerConfig(
|
||||
profiler="torch",
|
||||
torch_profiler_dir="/path/to/traces",
|
||||
)
|
||||
with patch.object(NPUWorker, "__init__", lambda x, **kwargs: None):
|
||||
worker = NPUWorker()
|
||||
worker.profiler_config = profiler_config
|
||||
worker.rank = 0
|
||||
mock_profiler = MagicMock()
|
||||
worker.profiler = mock_profiler
|
||||
|
||||
# Test start profiler
|
||||
worker.profile(is_start=True)
|
||||
mock_profiler.start.assert_called_once()
|
||||
|
||||
# Test stop profiler
|
||||
worker.profile(is_start=False)
|
||||
mock_profiler.stop.assert_called_once()
|
||||
|
||||
@@ -296,42 +300,141 @@ class TestNPUWorker(TestBase):
|
||||
"""Test profile method raises exception when profiler is not available"""
|
||||
from vllm_ascend.worker.worker import NPUWorker
|
||||
|
||||
# Create worker mock
|
||||
# Create worker mock - profiler_config indicates profiling disabled
|
||||
with patch.object(NPUWorker, "__init__", lambda x, **kwargs: None):
|
||||
worker = NPUWorker()
|
||||
worker.profiler = None
|
||||
worker.profiler_config = ProfilerConfig(profiler=None, torch_profiler_dir="")
|
||||
|
||||
# Test should raise exception
|
||||
with self.assertRaises(RuntimeError) as cm:
|
||||
worker.profile()
|
||||
|
||||
self.assertIn("Profiler is not enabled", str(cm.exception))
|
||||
self.assertIn("Profiling is not enabled", str(cm.exception))
|
||||
|
||||
def test_profile_with_prefix_uses_trace_name(self):
|
||||
"""[RFC #6954] profile() accepts profile_prefix and passes trace_name to _create_profiler"""
|
||||
from vllm_ascend.worker.worker import NPUWorker
|
||||
|
||||
profiler_config = ProfilerConfig(
|
||||
profiler="torch",
|
||||
torch_profiler_dir="/path/to/traces",
|
||||
)
|
||||
vllm_config_mock = MagicMock()
|
||||
vllm_config_mock.profiler_config = profiler_config
|
||||
|
||||
with patch.object(NPUWorker, "__init__", lambda x, **kwargs: None):
|
||||
worker = NPUWorker()
|
||||
worker.profiler_config = profiler_config
|
||||
worker.profiler = None
|
||||
worker.rank = 0
|
||||
|
||||
with patch("vllm.distributed.utils.get_worker_rank_suffix", return_value="dp0_pp0_tp0_dcp0_ep0_rank0"):
|
||||
with patch.object(NPUWorker, "_create_profiler", return_value=MagicMock()) as mock_create:
|
||||
worker.profile(is_start=True, profile_prefix="warmup")
|
||||
|
||||
mock_create.assert_called_once_with("warmup_dp0_pp0_tp0_dcp0_ep0_rank0")
|
||||
|
||||
def test_profile_lazy_init(self):
|
||||
"""[RFC #6954] Profiler is lazily created on first profile(is_start=True) call"""
|
||||
from vllm_ascend.worker.worker import NPUWorker
|
||||
|
||||
profiler_config = ProfilerConfig(
|
||||
profiler="torch",
|
||||
torch_profiler_dir="/path/to/traces",
|
||||
)
|
||||
vllm_config_mock = MagicMock()
|
||||
vllm_config_mock.profiler_config = profiler_config
|
||||
|
||||
with patch.object(NPUWorker, "_create_profiler", return_value=MagicMock()) as mock_create:
|
||||
with patch.object(NPUWorker, "__init__", lambda x, **kwargs: None):
|
||||
worker = NPUWorker()
|
||||
worker.profiler_config = profiler_config
|
||||
worker.profiler = None
|
||||
worker.rank = 0
|
||||
|
||||
self.assertIsNone(worker.profiler)
|
||||
mock_create.assert_not_called()
|
||||
|
||||
with patch("vllm.distributed.utils.get_worker_rank_suffix", return_value="dp0_pp0_tp0_dcp0_ep0_rank0"):
|
||||
worker.profile(is_start=True)
|
||||
|
||||
mock_create.assert_called_once()
|
||||
self.assertIsNotNone(worker.profiler)
|
||||
|
||||
def test_profile_restart_reuses_existing_profiler(self):
|
||||
"""[RFC #6954] Restarting profile (stop then start) reuses existing profiler, does not call _create_profiler again"""
|
||||
from vllm_ascend.worker.worker import NPUWorker
|
||||
|
||||
profiler_config = ProfilerConfig(
|
||||
profiler="torch",
|
||||
torch_profiler_dir="/path/to/traces",
|
||||
)
|
||||
mock_profiler = MagicMock()
|
||||
|
||||
with patch.object(NPUWorker, "__init__", lambda x, **kwargs: None):
|
||||
worker = NPUWorker()
|
||||
worker.profiler_config = profiler_config
|
||||
worker.profiler = None
|
||||
worker.rank = 0
|
||||
|
||||
with patch("vllm.distributed.utils.get_worker_rank_suffix", return_value="dp0_pp0_tp0_dcp0_ep0_rank0"):
|
||||
with patch.object(NPUWorker, "_create_profiler", return_value=mock_profiler) as mock_create:
|
||||
worker.profile(is_start=True, profile_prefix="session1")
|
||||
mock_create.assert_called_once_with("session1_dp0_pp0_tp0_dcp0_ep0_rank0")
|
||||
|
||||
worker.profile(is_start=False)
|
||||
worker.profile(is_start=True) # Restart without new prefix
|
||||
# Should NOT create new profiler, just restart existing
|
||||
mock_create.assert_called_once()
|
||||
|
||||
def test_trace_handler_uses_worker_name(self):
|
||||
"""[RFC #6954] _create_profiler passes worker_name to tensorboard_trace_handler"""
|
||||
from vllm_ascend.worker.worker import NPUWorker
|
||||
|
||||
profiler_config = ProfilerConfig(
|
||||
profiler="torch",
|
||||
torch_profiler_dir="/path/to/traces",
|
||||
)
|
||||
vllm_config_mock = MagicMock()
|
||||
vllm_config_mock.profiler_config = profiler_config
|
||||
|
||||
with patch("vllm_ascend.worker.worker.envs_ascend") as mock_envs:
|
||||
mock_envs.MSMONITOR_USE_DAEMON = 0
|
||||
with patch("torch_npu.profiler.tensorboard_trace_handler") as mock_handler:
|
||||
with patch.object(NPUWorker, "__init__", lambda x, **kwargs: None):
|
||||
worker = NPUWorker()
|
||||
worker.profiler_config = profiler_config
|
||||
worker.vllm_config = vllm_config_mock
|
||||
|
||||
worker._create_profiler("warmup_dp0_pp0_tp0_dcp0_ep0_rank0")
|
||||
|
||||
mock_handler.assert_called_once()
|
||||
call_kwargs = mock_handler.call_args[1] if mock_handler.call_args[1] else {}
|
||||
self.assertEqual(call_kwargs.get("worker_name"), "warmup_dp0_pp0_tp0_dcp0_ep0_rank0")
|
||||
|
||||
@patch("vllm_ascend.worker.worker.envs_ascend")
|
||||
def test_profile_and_msmonitor_both_enabled_raises_error(
|
||||
self, mock_envs_ascend):
|
||||
"""Test profile method raises exception when both profiler and msmonitor are enabled"""
|
||||
"""Test _create_profiler raises when both profiler and msmonitor are enabled"""
|
||||
from vllm_ascend.worker.worker import NPUWorker
|
||||
|
||||
mock_envs_ascend.MSMONITOR_USE_DAEMON = 1
|
||||
|
||||
# Create profiler config object
|
||||
profiler_config = ProfilerConfig(
|
||||
profiler="torch",
|
||||
torch_profiler_dir="/path/to/traces"
|
||||
)
|
||||
|
||||
vllm_config_mock = MagicMock()
|
||||
vllm_config_mock.profiler_config = profiler_config
|
||||
|
||||
# Create worker mock
|
||||
with patch.object(NPUWorker, "__init__", lambda x, **kwargs: None):
|
||||
worker = NPUWorker()
|
||||
worker.profiler_config = profiler_config
|
||||
worker.vllm_config = vllm_config_mock
|
||||
|
||||
# Test should raise exception
|
||||
with self.assertRaises(RuntimeError) as cm:
|
||||
_ = worker._init_profiler()
|
||||
_ = worker._create_profiler("test_trace")
|
||||
|
||||
self.assertIn(
|
||||
"MSMONITOR_USE_DAEMON and torch profiler cannot be both enabled at the same time.",
|
||||
@@ -417,7 +520,7 @@ class TestNPUWorker(TestBase):
|
||||
mock_model_runner._dummy_run.assert_called_once_with(
|
||||
num_tokens=mock_decode_token_per_req, uniform_decode=True)
|
||||
|
||||
@patch("vllm_ascend.worker.worker.logger")
|
||||
@patch("vllm_ascend.worker.worker.envs_ascend")
|
||||
@patch("torch_npu.profiler._ExperimentalConfig")
|
||||
@patch("torch_npu.profiler.profile")
|
||||
@patch("torch_npu.profiler.tensorboard_trace_handler")
|
||||
@@ -425,7 +528,7 @@ class TestNPUWorker(TestBase):
|
||||
@patch("torch_npu.profiler.ProfilerLevel")
|
||||
@patch("torch_npu.profiler.AiCMetrics")
|
||||
@patch("torch_npu.profiler.ProfilerActivity")
|
||||
def test_init_profiler_enabled(
|
||||
def test_create_profiler_enabled(
|
||||
self,
|
||||
mock_profiler_activity,
|
||||
mock_aic_metrics,
|
||||
@@ -434,30 +537,28 @@ class TestNPUWorker(TestBase):
|
||||
mock_trace_handler,
|
||||
mock_profile,
|
||||
mock_experimental_config,
|
||||
mock_logger,
|
||||
mock_envs_ascend,
|
||||
):
|
||||
"""Test _init_profiler method - profiler enabled case with stack and memory profiling enabled"""
|
||||
"""Test _create_profiler - profiler enabled with worker_name for trace naming (RFC #6954)"""
|
||||
from vllm_ascend.worker.worker import NPUWorker
|
||||
|
||||
# Create profiler config object
|
||||
mock_envs_ascend.MSMONITOR_USE_DAEMON = 0
|
||||
|
||||
profiler_config = ProfilerConfig(
|
||||
profiler="torch",
|
||||
torch_profiler_dir="/path/to/traces",
|
||||
torch_profiler_with_stack=True,
|
||||
torch_profiler_with_memory=True
|
||||
)
|
||||
|
||||
vllm_config_mock = MagicMock()
|
||||
vllm_config_mock.profiler_config = profiler_config
|
||||
|
||||
# Set enum mocks
|
||||
mock_export_type.Text = "Text"
|
||||
mock_profiler_level.Level1 = "Level1"
|
||||
mock_aic_metrics.AiCoreNone = "AiCoreNone"
|
||||
mock_profiler_activity.CPU = "CPU"
|
||||
mock_profiler_activity.NPU = "NPU"
|
||||
|
||||
# Set mock return values
|
||||
mock_experimental_config_instance = MagicMock()
|
||||
mock_experimental_config.return_value = mock_experimental_config_instance
|
||||
mock_trace_handler_instance = MagicMock()
|
||||
@@ -465,25 +566,16 @@ class TestNPUWorker(TestBase):
|
||||
mock_profiler_instance = MagicMock()
|
||||
mock_profile.return_value = mock_profiler_instance
|
||||
|
||||
# Create worker mock
|
||||
with patch.object(NPUWorker, "__init__", lambda x, **kwargs: None):
|
||||
worker = NPUWorker()
|
||||
worker.profiler_config = profiler_config
|
||||
worker.vllm_config = vllm_config_mock
|
||||
|
||||
# Test _init_profiler
|
||||
result = worker._init_profiler()
|
||||
result = worker._create_profiler("warmup_dp0_pp0_tp0_dcp0_ep0_rank0")
|
||||
|
||||
# Verify log output
|
||||
mock_logger.info.assert_called_once_with(
|
||||
"Profiling enabled. Traces will be saved to: %s",
|
||||
"/path/to/traces")
|
||||
|
||||
# Verify ExperimentalConfig creation
|
||||
mock_experimental_config.assert_called_once()
|
||||
config_call = mock_experimental_config.call_args
|
||||
config_kwargs = config_call.kwargs
|
||||
|
||||
# Verify configuration parameters
|
||||
expected_config = {
|
||||
"export_type": "Text",
|
||||
"profiler_level": "Level1",
|
||||
@@ -491,82 +583,60 @@ class TestNPUWorker(TestBase):
|
||||
"aic_metrics": "AiCoreNone",
|
||||
"l2_cache": False,
|
||||
"op_attr": False,
|
||||
"data_simplification": False,
|
||||
"data_simplification": True,
|
||||
"record_op_args": False,
|
||||
"gc_detect_threshold": None,
|
||||
}
|
||||
for key, expected_value in expected_config.items():
|
||||
self.assertEqual(config_kwargs[key], expected_value)
|
||||
|
||||
# Verify trace handler creation
|
||||
mock_trace_handler.assert_called_once_with("/path/to/traces")
|
||||
# Verify trace handler called with worker_name (RFC #6954)
|
||||
mock_trace_handler.assert_called_once_with(
|
||||
"/path/to/traces",
|
||||
worker_name="warmup_dp0_pp0_tp0_dcp0_ep0_rank0",
|
||||
)
|
||||
|
||||
# Verify profiler creation
|
||||
mock_profile.assert_called_once()
|
||||
profile_call = mock_profile.call_args
|
||||
profile_kwargs = profile_call.kwargs
|
||||
|
||||
# Verify profiler parameters
|
||||
profile_kwargs = mock_profile.call_args.kwargs
|
||||
expected_activities = ["CPU", "NPU"]
|
||||
self.assertEqual(profile_kwargs["activities"], expected_activities)
|
||||
self.assertTrue(profile_kwargs["with_stack"])
|
||||
self.assertTrue(profile_kwargs["profile_memory"])
|
||||
self.assertFalse(profile_kwargs["with_modules"])
|
||||
self.assertEqual(profile_kwargs["experimental_config"],
|
||||
mock_experimental_config_instance)
|
||||
self.assertEqual(profile_kwargs["on_trace_ready"],
|
||||
mock_trace_handler_instance)
|
||||
|
||||
# Verify return value
|
||||
self.assertEqual(profile_kwargs["on_trace_ready"], mock_trace_handler_instance)
|
||||
self.assertEqual(result, mock_profiler_instance)
|
||||
|
||||
def test_init_profiler_disabled(self):
|
||||
"""Test _init_profiler method - profiler disabled case"""
|
||||
def test_create_profiler_disabled(self):
|
||||
"""Test _create_profiler raises when profiler disabled"""
|
||||
from vllm_ascend.worker.worker import NPUWorker
|
||||
|
||||
# Create profiler config object with profiler disabled
|
||||
profiler_config = ProfilerConfig(
|
||||
profiler=None,
|
||||
torch_profiler_dir=""
|
||||
)
|
||||
|
||||
vllm_config_mock = MagicMock()
|
||||
vllm_config_mock.profiler_config = profiler_config
|
||||
|
||||
# Create worker mock
|
||||
with patch.object(NPUWorker, "__init__", lambda x, **kwargs: None):
|
||||
worker = NPUWorker()
|
||||
worker.vllm_config = vllm_config_mock
|
||||
worker.profiler_config = profiler_config
|
||||
|
||||
# Test _init_profiler
|
||||
result = worker._init_profiler()
|
||||
with self.assertRaises(RuntimeError) as cm:
|
||||
worker._create_profiler("test_trace")
|
||||
self.assertIn("Unrecognized profiler: None", str(cm.exception))
|
||||
|
||||
# Verify returns None
|
||||
self.assertIsNone(result)
|
||||
|
||||
def test_init_profiler_empty_dir(self):
|
||||
"""Test _init_profiler method - empty directory string case"""
|
||||
def test_create_profiler_empty_dir(self):
|
||||
"""Test _create_profiler raises when torch_profiler_dir is empty/falsy"""
|
||||
from vllm_ascend.worker.worker import NPUWorker
|
||||
|
||||
# Create profiler config object with empty dir
|
||||
profiler_config = ProfilerConfig(
|
||||
profiler="torch",
|
||||
torch_profiler_dir=""
|
||||
)
|
||||
# Use MagicMock to bypass ProfilerConfig validation (empty dir not allowed)
|
||||
profiler_config = MagicMock()
|
||||
profiler_config.profiler = "torch"
|
||||
profiler_config.torch_profiler_dir = ""
|
||||
|
||||
vllm_config_mock = MagicMock()
|
||||
vllm_config_mock.profiler_config = profiler_config
|
||||
|
||||
# Create worker mock
|
||||
with patch.object(NPUWorker, "__init__", lambda x, **kwargs: None):
|
||||
worker = NPUWorker()
|
||||
worker.vllm_config = vllm_config_mock
|
||||
worker.profiler_config = profiler_config
|
||||
|
||||
# Test _init_profiler
|
||||
result = worker._init_profiler()
|
||||
|
||||
# Verify returns None (empty string is considered false)
|
||||
self.assertIsNone(result)
|
||||
with self.assertRaises(RuntimeError) as cm:
|
||||
worker._create_profiler("test_trace")
|
||||
self.assertIn("torch_profiler_dir cannot be empty", str(cm.exception))
|
||||
|
||||
@patch("torch.npu.reset_peak_memory_stats")
|
||||
@patch("torch.npu.empty_cache")
|
||||
|
||||
@@ -125,7 +125,9 @@ class NPUWorker(WorkerBase):
|
||||
else:
|
||||
self.cache_dtype = STR_DTYPE_TO_TORCH_DTYPE[self.cache_config.cache_dtype]
|
||||
|
||||
self.profiler = self._init_profiler()
|
||||
# Profiler is lazily initialized on first profile(is_start=True) call (RFC #6954)
|
||||
self.profiler_config = vllm_config.profiler_config
|
||||
self.profiler = None
|
||||
if vllm_config.model_config and vllm_config.model_config.enable_sleep_mode:
|
||||
# Buffers saved before sleep
|
||||
self._sleep_saved_buffers: dict[str, torch.Tensor] = {}
|
||||
@@ -511,12 +513,34 @@ class NPUWorker(WorkerBase):
|
||||
with context:
|
||||
self.model_runner.initialize_kv_cache(kv_cache_config)
|
||||
|
||||
def profile(self, is_start: bool = True):
|
||||
if self.profiler is None:
|
||||
raise RuntimeError("Profiler is not enabled.")
|
||||
def profile(self, is_start: bool = True, profile_prefix: str | None = None):
|
||||
# Check if profiling is enabled (RFC #6954 - align with upstream vLLM)
|
||||
if self.profiler_config is None or self.profiler_config.profiler is None:
|
||||
raise RuntimeError(
|
||||
"Profiling is not enabled. Please set --profiler-config to enable "
|
||||
"profiling. Example: "
|
||||
"'--profiler-config.profiler=torch --profiler-config.torch_profiler_dir"
|
||||
"=YOUR_DIR_PATH_TO_DUMP_TRACE'"
|
||||
)
|
||||
|
||||
if is_start:
|
||||
self.profiler.start()
|
||||
from vllm.distributed.utils import get_worker_rank_suffix
|
||||
|
||||
rank_suffix = get_worker_rank_suffix(global_rank=self.rank)
|
||||
trace_name = f"{profile_prefix}_{rank_suffix}" if profile_prefix else rank_suffix
|
||||
|
||||
if self.profiler is None:
|
||||
self.profiler = self._create_profiler(trace_name)
|
||||
logger.debug("Starting torch profiler with trace name: %s", trace_name)
|
||||
self.profiler.start() # type: ignore[attr-defined]
|
||||
else:
|
||||
# Profiler already initialized. Restart profiling but keep
|
||||
# the original trace name from the first initialization.
|
||||
self.profiler.start()
|
||||
else:
|
||||
if self.profiler is None:
|
||||
logger.warning("Profiler was not started, nothing to stop.")
|
||||
return
|
||||
self.profiler.stop()
|
||||
|
||||
def add_lora(self, lora_request: LoRARequest) -> bool:
|
||||
@@ -553,43 +577,45 @@ class NPUWorker(WorkerBase):
|
||||
ensure_kv_transfer_initialized(self.vllm_config)
|
||||
ensure_ec_transfer_initialized(self.vllm_config)
|
||||
|
||||
def _init_profiler(self):
|
||||
# Torch profiler. Enabled through profiler_config:
|
||||
# --profiler-config.profiler=torch --profiler-config.torch_profiler_dir=/path/to/save/trace
|
||||
profiler_config = self.vllm_config.profiler_config
|
||||
if profiler_config.profiler == "torch" and profiler_config.torch_profiler_dir:
|
||||
if envs_ascend.MSMONITOR_USE_DAEMON:
|
||||
raise RuntimeError("MSMONITOR_USE_DAEMON and torch profiler cannot be both enabled at the same time.")
|
||||
torch_profiler_trace_dir = profiler_config.torch_profiler_dir
|
||||
logger.info("Profiling enabled. Traces will be saved to: %s", torch_profiler_trace_dir)
|
||||
def _create_profiler(self, trace_name: str):
|
||||
"""Create torch_npu profiler with trace naming for unique files per worker (RFC #6954)."""
|
||||
profiler_config = self.profiler_config
|
||||
|
||||
experimental_config = torch_npu.profiler._ExperimentalConfig(
|
||||
export_type=torch_npu.profiler.ExportType.Text,
|
||||
profiler_level=torch_npu.profiler.ProfilerLevel.Level1,
|
||||
msprof_tx=False,
|
||||
aic_metrics=torch_npu.profiler.AiCMetrics.AiCoreNone,
|
||||
l2_cache=False,
|
||||
op_attr=False,
|
||||
data_simplification=True,
|
||||
record_op_args=False,
|
||||
gc_detect_threshold=None,
|
||||
)
|
||||
if profiler_config.profiler != "torch":
|
||||
raise RuntimeError(f"Unrecognized profiler: {profiler_config.profiler}")
|
||||
if not profiler_config.torch_profiler_dir:
|
||||
raise RuntimeError("torch_profiler_dir cannot be empty.")
|
||||
if envs_ascend.MSMONITOR_USE_DAEMON:
|
||||
raise RuntimeError("MSMONITOR_USE_DAEMON and torch profiler cannot be both enabled at the same time.")
|
||||
|
||||
return torch_npu.profiler.profile(
|
||||
activities=[
|
||||
torch_npu.profiler.ProfilerActivity.CPU,
|
||||
torch_npu.profiler.ProfilerActivity.NPU,
|
||||
],
|
||||
with_stack=False,
|
||||
profile_memory=profiler_config.torch_profiler_with_memory,
|
||||
# NOTE: torch_npu.profiler.with_modules is equivalent to torch.profiler.with_stack.
|
||||
# The with_stack option in torch_npu.profiler introduces significant time overhead.
|
||||
with_modules=profiler_config.torch_profiler_with_stack,
|
||||
experimental_config=experimental_config,
|
||||
on_trace_ready=torch_npu.profiler.tensorboard_trace_handler(torch_profiler_trace_dir),
|
||||
)
|
||||
else:
|
||||
return None
|
||||
experimental_config = torch_npu.profiler._ExperimentalConfig(
|
||||
export_type=torch_npu.profiler.ExportType.Text,
|
||||
profiler_level=torch_npu.profiler.ProfilerLevel.Level1,
|
||||
msprof_tx=False,
|
||||
aic_metrics=torch_npu.profiler.AiCMetrics.AiCoreNone,
|
||||
l2_cache=False,
|
||||
op_attr=False,
|
||||
data_simplification=True,
|
||||
record_op_args=False,
|
||||
gc_detect_threshold=None,
|
||||
)
|
||||
|
||||
return torch_npu.profiler.profile(
|
||||
activities=[
|
||||
torch_npu.profiler.ProfilerActivity.CPU,
|
||||
torch_npu.profiler.ProfilerActivity.NPU,
|
||||
],
|
||||
with_stack=False,
|
||||
profile_memory=profiler_config.torch_profiler_with_memory,
|
||||
# NOTE: torch_npu.profiler.with_modules is equivalent to torch.profiler.with_stack.
|
||||
# The with_stack option in torch_npu.profiler introduces significant time overhead.
|
||||
with_modules=profiler_config.torch_profiler_with_stack,
|
||||
experimental_config=experimental_config,
|
||||
on_trace_ready=torch_npu.profiler.tensorboard_trace_handler(
|
||||
profiler_config.torch_profiler_dir,
|
||||
worker_name=trace_name,
|
||||
),
|
||||
)
|
||||
|
||||
def get_supported_pooling_tasks(self):
|
||||
return self.model_runner.get_supported_pooling_tasks()
|
||||
|
||||
Reference in New Issue
Block a user