Files
bi_150-vllm/vllm/env_override.py

526 lines
19 KiB
Python
Raw Permalink Normal View History

# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
# ruff: noqa: E402
import importlib.util
import os
def _get_torch_cuda_version():
"""Peripheral function to _maybe_set_cuda_compatibility_path().
PyTorch version must not be determined by importing directly
because it will trigger the CUDA initialization, losing the
chance to set the LD_LIBRARY_PATH beforehand.
"""
try:
spec = importlib.util.find_spec("torch")
if not spec:
return None
if spec.origin:
torch_root = os.path.dirname(spec.origin)
elif spec.submodule_search_locations:
torch_root = spec.submodule_search_locations[0]
else:
return None
version_path = os.path.join(torch_root, "version.py")
if not os.path.exists(version_path):
return None
# Load the version module without importing torch
ver_spec = importlib.util.spec_from_file_location("torch.version", version_path)
if not ver_spec or not ver_spec.loader:
return None
module = importlib.util.module_from_spec(ver_spec)
# Avoid registering in sys.modules to not confuse future imports
ver_spec.loader.exec_module(module)
return getattr(module, "cuda", None)
except Exception:
return None
def _maybe_set_cuda_compatibility_path():
"""Set LD_LIBRARY_PATH for CUDA forward compatibility if enabled.
Must run before 'import torch' since torch loads CUDA shared libraries
at import time and the dynamic linker only consults LD_LIBRARY_PATH when
a library is first loaded.
CUDA forward compatibility is only supported on select professional and
datacenter NVIDIA GPUs. Consumer GPUs (GeForce, RTX) do not support it
and will get Error 803 if compat libs are loaded.
"""
enable = os.environ.get("VLLM_ENABLE_CUDA_COMPATIBILITY", "0").strip().lower() in (
"1",
"true",
)
if not enable:
return
cuda_compat_path = os.environ.get("VLLM_CUDA_COMPATIBILITY_PATH", "")
if not cuda_compat_path or not os.path.isdir(cuda_compat_path):
conda_prefix = os.environ.get("CONDA_PREFIX", "")
conda_compat = os.path.join(conda_prefix, "cuda-compat")
if conda_prefix and os.path.isdir(conda_compat):
cuda_compat_path = conda_compat
if not cuda_compat_path or not os.path.isdir(cuda_compat_path):
torch_cuda_version = _get_torch_cuda_version()
if torch_cuda_version:
default_path = f"/usr/local/cuda-{torch_cuda_version}/compat"
if os.path.isdir(default_path):
cuda_compat_path = default_path
if not cuda_compat_path or not os.path.isdir(cuda_compat_path):
return
norm_path = os.path.normpath(cuda_compat_path)
existing = os.environ.get("LD_LIBRARY_PATH", "")
ld_paths = existing.split(os.pathsep) if existing else []
if ld_paths and ld_paths[0] and os.path.normpath(ld_paths[0]) == norm_path:
return # Already at the front
new_paths = [norm_path] + [
p for p in ld_paths if not p or os.path.normpath(p) != norm_path
]
os.environ["LD_LIBRARY_PATH"] = os.pathsep.join(new_paths)
_maybe_set_cuda_compatibility_path()
import torch
from vllm.logger import init_logger
from vllm.utils.torch_utils import is_torch_equal
logger = init_logger(__name__)
# set some common config/environment variables that should be set
# for all processes created by vllm and all processes
# that interact with vllm workers.
# they are executed whenever `import vllm` is called.
# see https://github.com/vllm-project/vllm/pull/15951
# it avoids unintentional cuda initialization from torch.cuda.is_available()
os.environ["PYTORCH_NVML_BASED_CUDA_CHECK"] = "1"
# see https://github.com/vllm-project/vllm/issues/10480
os.environ["TORCHINDUCTOR_COMPILE_THREADS"] = "1"
# see https://github.com/vllm-project/vllm/issues/10619
torch._inductor.config.compile_threads = 1
# ===================================================
# torch 2.9 Inductor PythonWrapperCodegen monkeypatch
# ===================================================
# This change monkeypatches memory_plan_reuse in pytorch 2.9.0 to work around
# a test failure for test_multi_graph_piecewise_compile_outputs_equal.
# For more context, see https://github.com/pytorch/pytorch/pull/165514.
def memory_plan_reuse_patched(self):
import torch._inductor.ir as ir
from torch._inductor.codegen.wrapper import (
EnterSubgraphLine,
ExitSubgraphLine,
MemoryPlanningLine,
MemoryPlanningState,
SubgraphPythonWrapperCodegen,
)
from torch._inductor.virtualized import V
def get_output_names(graph_outputs) -> list[str]:
import itertools
names = []
shape_counter = itertools.count(0)
none_counter = itertools.count(0)
for node in graph_outputs:
if isinstance(node, ir.NoneAsConstantBuffer):
names.append(f"{V.graph.name}_none{next(none_counter)}")
elif isinstance(node, ir.ShapeAsConstantBuffer):
names.append(f"{V.graph.name}_shape{next(shape_counter)}")
else:
names.append(node.get_name())
return names
if (
isinstance(V.graph.wrapper_code, SubgraphPythonWrapperCodegen)
and V.graph.wrapper_code.partition_signatures is not None
):
out_names = get_output_names(
V.graph.wrapper_code.partition_signatures.output_nodes
)
else:
out_names = V.graph.get_output_names()
while (
self.lines
and isinstance(self.lines[-1], MemoryPlanningLine)
and self.lines[-1].node.name not in out_names # type: ignore[attr-defined]
):
# these lines will be pointless
self.lines.pop()
# codegen allocations in two passes
planning_states = [MemoryPlanningState()]
past_planning_states = []
for i in range(len(self.lines)):
line = self.lines[i]
if isinstance(line, MemoryPlanningLine):
self.lines[i] = line.plan(planning_states[-1])
elif isinstance(line, EnterSubgraphLine):
planning_states.append(MemoryPlanningState())
elif isinstance(line, ExitSubgraphLine):
past_planning_states.append(planning_states.pop())
past_planning_states.append(planning_states.pop())
assert len(planning_states) == 0
# ===================================================
# torch 2.9 Inductor get_graph_partition_signature monkeypatch
# ===================================================
# This change monkeypatches get_graph_partition_signature in pytorch 2.9.0 to
# fix inductor partition + attention-nvfp4 quant fusion, tested in
# `tests/compile/test_fusion_attn.py::test_attn_quant`.
# For more context, see https://github.com/pytorch/pytorch/pull/165815.
def get_graph_partition_signature_patched(
self, partitions, skip_cudagraphs: list[bool]
):
"""
Gets signature for each graph partition, including input nodes, output nodes, and
whether deallocating an input within graph partition.
"""
from torch._inductor import dependencies
from torch._inductor.ir import GraphPartitionSignature, MutationOutput, NoneLayout
from torch._inductor.virtualized import V
from torch.utils._ordered_set import OrderedSet
signatures = []
unmet_output_names = OrderedSet(V.graph.get_output_names())
name_to_node = self.get_name_to_nodes()
def is_none_layout(buf_name: str) -> bool:
"""
Checks if buf_name is NoneLayout. Buffers with NoneLayout is not allocated
so graph partition should not take it as inputs or outputs.
"""
buf = self.name_to_buf.get(buf_name, None)
if buf is None:
return False
if isinstance(buf.node.layout, NoneLayout):
if isinstance(buf.node, MutationOutput) and (
real_name := self.mutation_real_name.get(buf_name, None)
):
return is_none_layout(real_name)
return True
return False
for partition, skip_cudagraph in zip(
reversed(partitions), reversed(skip_cudagraphs)
):
output_names: OrderedSet[str] = OrderedSet()
for node in partition:
output_names.update(node.outputs_by_name.keys())
returned_output_names = output_names.intersection(unmet_output_names)
# all reads/writes are partition inputs except those generated
# within the partition and tensor constants
read_writes = dependencies.ReadWrites.merge_list(
[node.read_writes for node in partition]
)
# WeakDep is fake dependency on unused buffer. It should not appear
# in partition_input_names for inputs that are actually read or written.
partition_input_names = (
OrderedSet(
[
x.name
for x in read_writes.reads | read_writes.writes
if not is_none_layout(x.name)
]
)
- output_names
)
partition_input_names = OrderedSet(
self.mutation_real_name.get(name, name) for name in partition_input_names
)
buffer_names_to_free: OrderedSet[str] = OrderedSet()
for node in partition:
buffer_names_to_free.update(node.last_usage)
# buffer_names_to_free may contain buffers allocated in previous
# graph partitions. These buffers should also be a partition
# input.
extra_input_names = [
name
for name in (buffer_names_to_free - output_names)
if name in name_to_node
]
partition_input_names.update(extra_input_names)
input_nodes = {
name: name_to_node[name]
for name in partition_input_names
if name in name_to_node
}
input_deallocation = {
name: name in buffer_names_to_free
for name in partition_input_names
if name in name_to_node
}
# if an input tensor is not freed in the partition function, it should
# also be returned as an output. This brings benefits to cudagraph
# since the returned output tensor is a cudagraph managed tensor with
# a static tensor address.
extra_output_names = [
name
for name in partition_input_names
if name in name_to_node and name not in buffer_names_to_free
]
returned_output_names.update(extra_output_names)
returned_output_names = OrderedSet(
self.mutation_real_name.get(name, name) for name in returned_output_names
)
output_nodes = [
name_to_node[name]
for name in returned_output_names
if not is_none_layout(name)
]
constant_names = [
name for name in partition_input_names if name in V.graph.constants
]
symbol_inputs = self.get_graph_partition_symbol_inputs(partition, input_nodes)
partition_signature = GraphPartitionSignature(
symbol_inputs,
input_nodes,
output_nodes,
input_deallocation,
skip_cudagraph,
constant_names,
)
signatures.append(partition_signature)
unmet_output_names = partition_input_names.union(
unmet_output_names - returned_output_names
)
return signatures[::-1]
# ========================================
# torch 2.9 Inductor Scheduler monkeypatch
# ========================================
# This change monkeypatches a function in Inductor to work around the following
# bug: https://github.com/vllm-project/vllm/issues/26678
#
# The bug occurs when `use_inductor_graph_partition` is turned on and there
# exists operators inside of `splitting_ops` that have an in-place mutation. In
# vllm, this specifically occurs on the operator
# vllm.unified_attention_with_output. In this case, inductor does not populate
# the inductor IR's `origin_node` field, causing an assertion error when trying
# to access the node's `origin_node` field.
#
# So, we will monkeypatch torch._inductor.scheduler.Scheduler.should_partition
# so that it does not access the inductor IR node's `origin_node` field and just
# returns True if a node is registered as having a custom partition function.
# This is ok for now since vllm's implementation of the custom partition
# functions just return True.
# ========================================
def should_partition_patched(self, node, should_log: bool = False) -> bool:
# This is a patched version of
# torch._inductor.scheduler.Scheduler.should_partition that modifies
# the following piece of code so that we always return True:
# https://github.com/pytorch/pytorch/blob/ecb53078faf86ca1b33277df33b82985675bb011/torch/_inductor/scheduler.py#L4712-L4724
"""Return True if we should partition the inductor graph on this node"""
import torch._inductor.ir as ir
from torch._inductor.scheduler import (
BaseSchedulerNode,
FusedSchedulerNode,
)
from torch._inductor.utils import (
_unstable_customized_partition_wrapper,
is_cudagraph_unsafe_op,
maybe_log_cudagraph_partition,
)
# Allow users to manually specify if a node should be partitioned
# Can only do this for FallbackKernels
ir_node = node.node
if isinstance(ir_node, torch._inductor.ir.FallbackKernel) and (
op := ir_node.op_overload
):
op_overload_packet_name = op.name()
op_overload_name = (
f"{op_overload_packet_name}.{op._overloadname}"
if isinstance(op, torch._ops.OpOverload)
else op_overload_packet_name
)
if (
op_overload_packet_name
in torch._inductor.config.custom_should_partition_ops
or op_overload_name in torch._inductor.config.custom_should_partition_ops
):
assert isinstance(op, torch._ops.OpOverload)
return True
# When not using cudagraphs, keep all kernels in the `call` function
# instead of graph partition functions, since graph partition only brings
# benefit to cudagraph
if (
not torch._inductor.config.triton.cudagraphs
and _unstable_customized_partition_wrapper.wrapper is None
):
return True
# avoid duplicating logs when should_partition is called multiple times
# on the same node
def noop_log(msg: str, node: BaseSchedulerNode | None) -> None:
return
log_partition_reason = maybe_log_cudagraph_partition if should_log else noop_log
if isinstance(node, FusedSchedulerNode):
return any(self.should_partition(snode) for snode in node.snodes)
assert node.node is not None
if not node.is_gpu():
log_partition_reason("non gpu ops", node=node)
return True
if isinstance(node.node, ir.DeviceCopy):
log_partition_reason("DeviceCopy ops", node=node)
return True
if isinstance(node.node, ir.Conditional):
log_partition_reason("Conditional ops", node=node)
return True
if getattr(node.node, "unbacked_bindings", None):
log_partition_reason("unbacked binding ops", node=node)
return True
if is_cudagraph_unsafe_op(node.node):
log_partition_reason("CUDAGraph-unsafe custom ops", node=node)
return True
return False
def _update_scheduler_patched(self) -> None:
# Copied from torch._inductor.graph.GrahLowering._update_scheduler. Patches
# this method so that we can patch Scheduler.should_partition with the
# function above
"""
(Re)initializes the scheduler member. When initializing the scheduler, no CUBIN
files should be generated (to avoid biasing any benchmarks and pessimizing
fusion decisions).
"""
import torch._inductor.config as config
from torch._inductor.scheduler import Scheduler
Scheduler.should_partition = should_partition_patched
Scheduler.get_graph_partition_signature = get_graph_partition_signature_patched
with config.patch("triton.store_cubin", False):
self.scheduler = Scheduler(self.operations)
# ===================================================
# torch 2.9 Inductor get_raw_stream workaround
# ===================================================
# Workaround for TorchInductor autotune using get_raw_stream() without defining it.
# This occurs when compile_sizes > 1 in compilation_config.
# For more context, see https://github.com/vllm-project/vllm/issues/30905.
def _patch_get_raw_stream_if_needed():
"""Workaround for TorchInductor autotune get_raw_stream() bug."""
from vllm.utils.torch_utils import is_torch_equal
# Only apply the patch for torch 2.9.0 or 2.9.1
if is_torch_equal("2.9.0") or is_torch_equal("2.9.1"):
import builtins
# Check if CUDA functionality is available without initializing CUDA
# _cuda_getCurrentRawStream only exists in CUDA builds of PyTorch
if hasattr(torch._C, "_cuda_getCurrentRawStream"):
from torch._C import _cuda_getCurrentRawStream as _get_raw_stream
builtins.get_raw_stream = _get_raw_stream # type: ignore[attr-defined]
_patch_get_raw_stream_if_needed()
if is_torch_equal("2.9.0"):
from torch._inductor.codegen.wrapper import PythonWrapperCodegen
from torch._inductor.graph import GraphLowering
from torch.utils._config_module import _Config, _ConfigEntry
# `custom_should_partition_ops` is a new config after 2.9.0. So this would
# not overwrite any user configs.
torch._inductor.config._config["custom_should_partition_ops"] = _ConfigEntry(
_Config(default=[])
)
PythonWrapperCodegen.memory_plan_reuse = memory_plan_reuse_patched
GraphLowering._update_scheduler = _update_scheduler_patched
# ===================================================
# torch 2.11 Inductor constrain_to_fx_strides monkeypatch
# ===================================================
# Patch the inductor's `constrain_to_fx_strides` to handle opaque
# (non-tensor) arguments. The original calls `.stride()` on every FX
# arg's meta value, which crashes on FakeScriptObject (the compile-time
# proxy for hoisted opaque types). The patched version skips args
# whose meta value is not a torch.Tensor.
# Upstream issue: https://github.com/pytorch/pytorch/issues/175973
from vllm.utils.torch_utils import is_torch_equal_or_newer
if is_torch_equal_or_newer("2.11.0.dev"):
import torch._inductor.ir as _ir
import torch._inductor.lowering as _lowering
from torch._inductor.virtualized import V as _V
_orig_constrain = _lowering.constrain_to_fx_strides
def _patched_constrain_to_fx_strides(fx_node, *args, **kwargs):
def apply_constraint(arg, fx_arg):
if isinstance(arg, _ir.IRNode):
meta_val = fx_arg.meta.get("val")
if isinstance(meta_val, torch.Tensor):
stride_order = _ir.get_stride_order(
meta_val.stride(), _V.graph.sizevars.shape_env
)
return _ir.ExternKernel.require_stride_order(arg, stride_order)
return arg
if isinstance(arg, dict):
return {key: apply_constraint(arg[key], fx_arg[key]) for key in arg}
return arg
args = tuple(
apply_constraint(arg, fx_arg) for arg, fx_arg in zip(args, fx_node.args)
)
kwargs = {k: apply_constraint(v, fx_node.kwargs[k]) for k, v in kwargs.items()}
return args, kwargs
_lowering.constrain_to_fx_strides = _patched_constrain_to_fx_strides