Files
2026-03-10 13:31:25 +08:00

240 lines
11 KiB
Python

################################################################################
# Copyright(c)2020-2025 Shanghai Biren Technology Co., Ltd. All rights reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
################################################################################
import dataclasses
from contextlib import ExitStack
from typing import Any, Callable, Optional
from unittest.mock import patch
import torch
import torch_br
import vllm.envs as envs
from vllm.compilation.counter import compilation_counter
from vllm.config import VllmConfig
from vllm.distributed.device_communicators.pynccl_allocator import (
set_graph_pool_id)
from vllm.distributed.parallel_state import get_world_group
from vllm.forward_context import BatchDescriptor, get_forward_context
from vllm.logger import init_logger, logger
from vllm.platforms import current_platform
from vllm_br.compilation.monitor import validate_supagraph_capturing_enabled
from vllm_br.config.compilation import SUPAGraphMode
from vllm_br.forward_context import BatchDescriptor
logger = init_logger(__name__)
@dataclasses.dataclass
class SUPAGraphEntry:
batch_descriptor: BatchDescriptor
supagraph: Optional[torch.supa.SUPAGraph] = None
output: Optional[Any] = None
# for supagraph debugging, track the input addresses
# during capture, and check if they are the same during replay
input_addresses: Optional[list[int]] = None
@dataclasses.dataclass
class SUPAGraphOptions:
debug_log_enable: bool = True
gc_disable: bool = False
weak_ref_output: bool = True
class SUPAGraphWrapper:
"""Wraps a runnable to add SUPA graph capturing and replaying ability. And
provide attribute access to the underlying `runnable` via `__getattr__`.
The workflow of this wrapper in the supagraph dispatching is as follows:
1. At initialization, a runtime mode is assigned to the wrapper (FULL or
PIECEWISE).
2. At runtime, the wrapper receives a runtime_mode and a
batch_descriptor(key) from the forward context and blindly trust them
for supagraph dispatching.
3. If runtime_mode is NONE or runtime_mode does not match the mode of the
wrapper, just call the runnable directly.
4. Otherwise, i.e., the runtime_mode matches the mode of the wrapper,
the wrapper will perform supagraph capture(if key does not exist, create
a new entry and cache it) or replay (if key exists in the cache).
Note: SUPAGraphWrapper does not store persistent buffers or copy any
runtime inputs into that buffers for replay. We assume implementing them
is done outside of the wrapper. That is because we do not make any
assumption on the dynamic shape (batch size) of the runtime inputs, as a
trade-off for staying orthogonal to compilation logic. Nevertheless,
tracing and checking the input addresses to be consistent during replay is
guaranteed when VLLM_LOGGING_LEVEL == "DEBUG".
"""
def __init__(self,
runnable: Callable,
vllm_config: VllmConfig,
runtime_mode: SUPAGraphMode,
supagraph_options: Optional[SUPAGraphOptions] = None):
self.runnable = runnable
self.vllm_config = vllm_config
self.runtime_mode = runtime_mode
self.compilation_config = vllm_config.compilation_config
self.first_run_finished = False
self.is_debugging_mode = envs.VLLM_LOGGING_LEVEL == "DEBUG"
# assert runtime_mode is not NONE(no supagraph), otherwise, we don't
# need to initialize a SUPAGraphWrapper.
assert self.runtime_mode != SUPAGraphMode.NONE
# TODO: in the future, if we want to use multiple
# streams, it might not be safe to share a global pool.
# only investigate this when we use multiple streams
self.graph_pool = current_platform.get_global_graph_pool()
if supagraph_options is None:
supagraph_options = SUPAGraphOptions()
self.supagraph_options = supagraph_options
# the entries for different batch descriptors that we need to capture
# supagraphs for.
self.concrete_supagraph_entries: dict[BatchDescriptor, SUPAGraphEntry]\
= {}
def __getattr__(self, key: str):
# allow accessing the attributes of the runnable.
if hasattr(self.runnable, key):
return getattr(self.runnable, key)
raise AttributeError(f"Attribute {key} not exists in the runnable of "
f"supagraph wrapper: {self.runnable}")
def unwrap(self) -> Callable:
# in case we need to access the original runnable.
return self.runnable
def __call__(self, *args, **kwargs):
forward_context = get_forward_context()
batch_descriptor = forward_context.batch_descriptor
supagraph_runtime_mode = forward_context.cudagraph_runtime_mode
#if supagraph_runtime_mode == SUPAGraphMode.NONE or \
# supagraph_runtime_mode != self.runtime_mode:
if supagraph_runtime_mode == SUPAGraphMode.NONE:
# SUPAGraphMode.NONE could mean the profile run, a warmup run, or
# running without supagraphs.
# We do not trigger capture/replay if the runtime mode is not
# matches. This enables properly dispatching to the correct
# SUPAGraphWrapper when nesting multiple instances with different
# runtime modes.
return self.runnable(*args, **kwargs)
if batch_descriptor not in self.concrete_supagraph_entries:
# create a new entry for this batch descriptor
self.concrete_supagraph_entries[batch_descriptor] = \
SUPAGraphEntry(batch_descriptor=batch_descriptor)
entry = self.concrete_supagraph_entries[batch_descriptor]
if entry.supagraph is None:
if self.supagraph_options.debug_log_enable:
# Since we capture supagraph for many different shapes and
# capturing is fast, we don't need to log it for every
# shape. E.g. we only log it for the first subgraph in
# piecewise mode.
logger.debug("Capturing a supagraph on (%s,%s)",
self.runtime_mode.name, entry.batch_descriptor)
# validate that supagraph capturing is legal at this point.
validate_supagraph_capturing_enabled()
input_addresses = [
x.data_ptr() for x in args if isinstance(x, torch.Tensor)
] + [
x.data_ptr()
for x in kwargs.values() if isinstance(x, torch.Tensor)
]
entry.input_addresses = input_addresses
supagraph = torch.supa.SUPAGraph()
with ExitStack() as stack:
if self.supagraph_options.gc_disable:
# during every model forward for piecewise supagraph
# mode, we will capture many pieces of supagraphs
# (roughly one per layer). running gc again and again
# across layers will make the supagraph capture very slow.
# therefore, we only run gc for the first graph,
# and disable gc for the rest of the graphs.
stack.enter_context(patch("gc.collect", lambda: None))
stack.enter_context(
patch("torch.supa.empty_cache", lambda: None))
if self.graph_pool is not None:
set_graph_pool_id(self.graph_pool)
else:
set_graph_pool_id(current_platform.graph_pool_handle())
# mind-exploding: carefully manage the reference and memory.
with torch.supa.graph(supagraph, pool=self.graph_pool):
# `output` is managed by pytorch's supagraph pool
output = self.runnable(*args, **kwargs)
# (FIXME): torch.ops._C.weak_ref_tensor is not supported
# if self.supagraph_options.weak_ref_output:
# # by converting it to weak ref,
# # the original `output` will immediately be released
# # to save memory. It is only safe to do this for
# # the last graph in piecewise cuadgraph mode, because
# # the output of the last graph will not be used by
# # any other supa graph.
# output = weak_ref_tensors(output)
# here we always use weak ref for the output
# entry.output = weak_ref_tensors(output)
entry.output = output
entry.supagraph = supagraph
compilation_counter.num_cudagraph_captured += 1
# important: we need to return the output, rather than
# the weak ref of the output, so that pytorch can correctly
# manage the memory during supa graph capture
return output
if self.is_debugging_mode:
# check if the input addresses are the same
new_input_addresses = [
x.data_ptr() for x in args if isinstance(x, torch.Tensor)
] + [
x.data_ptr()
for x in kwargs.values() if isinstance(x, torch.Tensor)
]
assert new_input_addresses == entry.input_addresses, (
f"Input addresses for supagraphs are different "
f"during replay. Expected {entry.input_addresses}, "
f"got {new_input_addresses}")
if self.vllm_config.parallel_config.world_size != 1:
# prevent SCCL capturing by using the same stream with SCCL
stream = torch.distributed.get_group_stream(
get_world_group().device_group)
else:
stream = torch_br.supa.Stream()
current_stream = torch.supa.current_stream()
with torch_br.supa.stream(stream):
entry.supagraph.replay()
event = torch.supa.Event()
stream.record_event(event)
current_stream.wait_event(event)
logger.debug(" ========Supa graph reply======== ")
logger.debug(" padded num_tokens size = %s",
batch_descriptor.num_tokens)
return entry.output