266 lines
8.3 KiB
Python
266 lines
8.3 KiB
Python
# SPDX-License-Identifier: Apache-2.0
|
|
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
|
|
|
|
import atexit
|
|
import functools
|
|
import inspect
|
|
import os
|
|
import traceback
|
|
from collections.abc import Mapping
|
|
from contextlib import contextmanager
|
|
from typing import Any
|
|
|
|
from vllm.logger import init_logger
|
|
from vllm.tracing.utils import TRACE_HEADERS, LoadingSpanAttributes
|
|
|
|
logger = init_logger(__name__)
|
|
|
|
try:
|
|
from opentelemetry import trace
|
|
from opentelemetry.context.context import Context
|
|
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import (
|
|
OTLPSpanExporter as OTLPGrpcExporter,
|
|
)
|
|
from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
|
|
OTLPSpanExporter as OTLPHttpExporter,
|
|
)
|
|
from opentelemetry.propagate import inject
|
|
from opentelemetry.sdk.environment_variables import (
|
|
OTEL_EXPORTER_OTLP_TRACES_PROTOCOL,
|
|
)
|
|
from opentelemetry.sdk.resources import Resource
|
|
from opentelemetry.sdk.trace import TracerProvider
|
|
from opentelemetry.sdk.trace.export import BatchSpanProcessor
|
|
from opentelemetry.trace import (
|
|
SpanKind, # noqa: F401
|
|
Tracer,
|
|
set_tracer_provider,
|
|
)
|
|
from opentelemetry.trace.propagation.tracecontext import (
|
|
TraceContextTextMapPropagator,
|
|
)
|
|
|
|
_IS_OTEL_AVAILABLE = True
|
|
otel_import_error_traceback = None
|
|
except ImportError:
|
|
_IS_OTEL_AVAILABLE = False
|
|
otel_import_error_traceback = traceback.format_exc()
|
|
trace = None # type: ignore
|
|
Context = Any # type: ignore
|
|
Tracer = Any # type: ignore
|
|
inject = None # type: ignore
|
|
Resource = None # type: ignore
|
|
SpanKind = Any # type: ignore
|
|
|
|
|
|
def is_otel_available() -> bool:
|
|
return _IS_OTEL_AVAILABLE
|
|
|
|
|
|
def init_otel_tracer(
|
|
instrumenting_module_name: str,
|
|
otlp_traces_endpoint: str,
|
|
extra_attributes: dict[str, str] | None = None,
|
|
) -> Tracer:
|
|
"""Initializes the OpenTelemetry tracer provider."""
|
|
if not _IS_OTEL_AVAILABLE:
|
|
raise ValueError(
|
|
"OpenTelemetry is not available. Unable to initialize "
|
|
"a tracer. Ensure OpenTelemetry packages are installed. "
|
|
f"Original error:\n{otel_import_error_traceback}"
|
|
)
|
|
|
|
# Store the endpoint in environment so child processes can inherit it
|
|
os.environ["OTEL_EXPORTER_OTLP_TRACES_ENDPOINT"] = otlp_traces_endpoint
|
|
|
|
resource_attrs = {}
|
|
resource_attrs["vllm.instrumenting_module_name"] = instrumenting_module_name
|
|
resource_attrs["vllm.process_id"] = str(os.getpid())
|
|
if extra_attributes:
|
|
resource_attrs.update(extra_attributes)
|
|
resource = Resource.create(resource_attrs)
|
|
|
|
trace_provider = TracerProvider(resource=resource)
|
|
span_exporter = get_span_exporter(otlp_traces_endpoint)
|
|
trace_provider.add_span_processor(BatchSpanProcessor(span_exporter))
|
|
set_tracer_provider(trace_provider)
|
|
|
|
atexit.register(trace_provider.shutdown)
|
|
|
|
tracer = trace_provider.get_tracer(instrumenting_module_name)
|
|
return tracer
|
|
|
|
|
|
def get_span_exporter(endpoint):
|
|
protocol = os.environ.get(OTEL_EXPORTER_OTLP_TRACES_PROTOCOL, "grpc")
|
|
if protocol == "grpc":
|
|
exporter = OTLPGrpcExporter(endpoint=endpoint, insecure=True)
|
|
elif protocol == "http/protobuf":
|
|
exporter = OTLPHttpExporter(endpoint=endpoint)
|
|
else:
|
|
raise ValueError(f"Unsupported OTLP protocol '{protocol}' is configured")
|
|
return exporter
|
|
|
|
|
|
def init_otel_worker_tracer(
|
|
instrumenting_module_name: str,
|
|
process_kind: str,
|
|
process_name: str,
|
|
) -> Tracer:
|
|
"""
|
|
Backend-specific initialization for OpenTelemetry in a worker process.
|
|
"""
|
|
# Initialize the tracer if an OTLP endpoint is configured.
|
|
# The endpoint is propagated via environment variable from the main process.
|
|
otlp_endpoint = os.environ.get("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT")
|
|
if not otlp_endpoint:
|
|
return None
|
|
|
|
extra_attrs = {
|
|
"vllm.process_kind": process_kind,
|
|
"vllm.process_name": process_name,
|
|
}
|
|
|
|
return init_otel_tracer(instrumenting_module_name, otlp_endpoint, extra_attrs)
|
|
|
|
|
|
def extract_trace_context(headers: Mapping[str, str] | None) -> Context | None:
|
|
"""Extracts context from HTTP headers."""
|
|
if _IS_OTEL_AVAILABLE and headers:
|
|
return TraceContextTextMapPropagator().extract(headers)
|
|
return None
|
|
|
|
|
|
def instrument_otel(func, span_name, attributes, record_exception):
|
|
"""Internal wrapper logic for sync and async functions."""
|
|
|
|
# Pre-calculate static code attributes once (these don't change)
|
|
code_attrs = {
|
|
LoadingSpanAttributes.CODE_FUNCTION: func.__qualname__,
|
|
LoadingSpanAttributes.CODE_NAMESPACE: func.__module__,
|
|
LoadingSpanAttributes.CODE_FILEPATH: func.__code__.co_filename,
|
|
LoadingSpanAttributes.CODE_LINENO: str(func.__code__.co_firstlineno),
|
|
}
|
|
if attributes:
|
|
code_attrs.update(attributes)
|
|
|
|
final_span_name = span_name or func.__qualname__
|
|
module_name = func.__module__
|
|
|
|
@functools.wraps(func)
|
|
async def async_wrapper(*args, **kwargs):
|
|
tracer = trace.get_tracer(module_name)
|
|
ctx = _get_smart_context()
|
|
with (
|
|
tracer.start_as_current_span(
|
|
final_span_name,
|
|
context=ctx,
|
|
attributes=code_attrs,
|
|
record_exception=record_exception,
|
|
),
|
|
propagate_trace_to_env(),
|
|
):
|
|
return await func(*args, **kwargs)
|
|
|
|
@functools.wraps(func)
|
|
def sync_wrapper(*args, **kwargs):
|
|
tracer = trace.get_tracer(module_name)
|
|
ctx = _get_smart_context()
|
|
with (
|
|
tracer.start_as_current_span(
|
|
final_span_name,
|
|
context=ctx,
|
|
attributes=code_attrs,
|
|
record_exception=record_exception,
|
|
),
|
|
propagate_trace_to_env(),
|
|
):
|
|
return func(*args, **kwargs)
|
|
|
|
return async_wrapper if inspect.iscoroutinefunction(func) else sync_wrapper
|
|
|
|
|
|
def manual_instrument_otel(
|
|
span_name: str,
|
|
start_time: int,
|
|
end_time: int | None = None,
|
|
attributes: dict[str, Any] | None = None,
|
|
context: Context | None = None,
|
|
kind: Any = None, # SpanKind, but typed as Any for when OTEL unavailable
|
|
):
|
|
"""Manually create and end a span with explicit timestamps."""
|
|
if not _IS_OTEL_AVAILABLE:
|
|
return
|
|
|
|
tracer = trace.get_tracer(__name__)
|
|
# Use provided context, or fall back to smart context detection
|
|
ctx = context if context is not None else _get_smart_context()
|
|
|
|
span_kwargs: dict[str, Any] = {
|
|
"name": span_name,
|
|
"context": ctx,
|
|
"start_time": start_time,
|
|
}
|
|
if kind is not None:
|
|
span_kwargs["kind"] = kind
|
|
|
|
span = tracer.start_span(**span_kwargs)
|
|
if attributes:
|
|
span.set_attributes(attributes)
|
|
if end_time is not None:
|
|
span.end(end_time=end_time)
|
|
else:
|
|
span.end()
|
|
|
|
|
|
def _get_smart_context() -> Context | None:
|
|
"""
|
|
Determines the parent context.
|
|
1. If a Span is already active in this process, use it.
|
|
2. If not, extract from os.environ, handling the case-sensitivity mismatch.
|
|
"""
|
|
current_span = trace.get_current_span()
|
|
if current_span.get_span_context().is_valid:
|
|
return None
|
|
|
|
carrier = {}
|
|
|
|
if tp := os.environ.get("traceparent", os.environ.get("TRACEPARENT")): # noqa: SIM112
|
|
carrier["traceparent"] = tp
|
|
|
|
if ts := os.environ.get("tracestate", os.environ.get("TRACESTATE")): # noqa: SIM112
|
|
carrier["tracestate"] = ts
|
|
|
|
if not carrier:
|
|
carrier = dict(os.environ)
|
|
|
|
return TraceContextTextMapPropagator().extract(carrier)
|
|
|
|
|
|
@contextmanager
|
|
def propagate_trace_to_env():
|
|
"""
|
|
Temporarily injects the current OTel context into os.environ.
|
|
This ensures that any subprocesses (like vLLM workers) spawned
|
|
within this context inherit the correct traceparent.
|
|
"""
|
|
if not _IS_OTEL_AVAILABLE:
|
|
yield
|
|
return
|
|
|
|
# Capture original state of relevant keys
|
|
original_state = {k: os.environ.get(k) for k in TRACE_HEADERS}
|
|
|
|
try:
|
|
# inject() writes 'traceparent' and 'tracestate' to os.environ
|
|
inject(os.environ)
|
|
yield
|
|
|
|
finally:
|
|
# Restore original environment
|
|
for key, original_value in original_state.items():
|
|
if original_value is None:
|
|
os.environ.pop(key, None)
|
|
else:
|
|
os.environ[key] = original_value
|