Files
sglang/test/srt/test_tracing.py
2025-09-15 02:08:02 +08:00

274 lines
8.8 KiB
Python

import multiprocessing as mp
import os
import subprocess
import time
import unittest
from dataclasses import dataclass
from typing import Any, Dict, Optional
import requests
import zmq
from sglang import Engine
from sglang.srt.managers.io_struct import TokenizedGenerateReqInput
from sglang.srt.tracing.trace import *
from sglang.srt.utils import get_zmq_socket, kill_process_tree
from sglang.test.test_utils import (
DEFAULT_SMALL_MODEL_NAME_FOR_TEST,
DEFAULT_TIMEOUT_FOR_SERVER_LAUNCH,
DEFAULT_URL_FOR_TEST,
CustomTestCase,
popen_launch_server,
)
@dataclass
class Req:
rid: int
trace_context: Optional[Dict[str, Any]] = None
class TestTrace(CustomTestCase):
def __launch_otel_jaeger(self):
cmd = [
"docker",
"compose",
"-f",
"../../examples/monitoring/tracing_compose.yaml",
"up",
"-d",
]
proc = subprocess.run(cmd)
if proc.returncode != 0:
print("launch opentelemetry collector and jaeger docker err")
return False
return True
def __stop_otel_jaeger(self):
cmd = [
"docker",
"compose",
"-f",
"../../examples/monitoring/tracing_compose.yaml",
"down",
]
proc = subprocess.run(cmd)
if proc.returncode != 0:
print("stop opentelemetry collector and jaeger docker err")
return False
return True
def __clear_trace_file(self):
try:
os.remove("/tmp/otel_trace.json")
except:
pass
def test_trace_enable(self):
self.__clear_trace_file()
assert self.__launch_otel_jaeger()
process = popen_launch_server(
DEFAULT_SMALL_MODEL_NAME_FOR_TEST,
DEFAULT_URL_FOR_TEST,
timeout=DEFAULT_TIMEOUT_FOR_SERVER_LAUNCH,
other_args=["--enable-trace", "--oltp-traces-endpoint", "0.0.0.0:4317"],
)
try:
# Make some requests to generate trace data
response = requests.get(f"{DEFAULT_URL_FOR_TEST}/health_generate")
self.assertEqual(response.status_code, 200)
response = requests.post(
f"{DEFAULT_URL_FOR_TEST}/generate",
json={
"text": "The capital of France is",
"sampling_params": {
"temperature": 0,
"max_new_tokens": 32,
},
"stream": True,
},
stream=True,
)
for _ in response.iter_lines(decode_unicode=False):
pass
# sleep for a few seconds to wait for opentelemetry collector to asynchronously export data to file.
time.sleep(10)
# check trace file
assert os.path.isfile("/tmp/otel_trace.json"), "trace file not exist"
assert os.path.getsize("/tmp/otel_trace.json") > 0, "trace file is empty"
finally:
kill_process_tree(process.pid)
assert self.__stop_otel_jaeger()
def test_trace_engine_enable(self):
self.__clear_trace_file()
assert self.__launch_otel_jaeger()
prompt = "Today is a sunny day and I like"
model_path = DEFAULT_SMALL_MODEL_NAME_FOR_TEST
sampling_params = {"temperature": 0, "max_new_tokens": 8}
engine = Engine(
model_path=model_path,
random_seed=42,
enable_trace=True,
oltp_traces_endpoint="localhost:4317",
)
try:
engine.generate(prompt, sampling_params)
# sleep for a few seconds to wait for opentelemetry collector to asynchronously export data to file.
time.sleep(10)
# check trace file
assert os.path.isfile("/tmp/otel_trace.json"), "trace file not exist"
assert os.path.getsize("/tmp/otel_trace.json") > 0, "trace file is empty"
finally:
engine.shutdown()
assert self.__stop_otel_jaeger()
def test_trace_engine_encode(self):
self.__clear_trace_file()
assert self.__launch_otel_jaeger()
prompt = "Today is a sunny day and I like"
model_path = "Qwen/Qwen2-7B"
engine = Engine(
model_path=model_path,
random_seed=42,
enable_trace=True,
oltp_traces_endpoint="localhost:4317",
is_embedding=True,
)
try:
engine.encode(prompt)
# sleep for a few seconds to wait for opentelemetry collector to asynchronously export data to file.
time.sleep(10)
# check trace file
assert os.path.isfile("/tmp/otel_trace.json"), "trace file not exist"
assert os.path.getsize("/tmp/otel_trace.json") > 0, "trace file is empty"
finally:
engine.shutdown()
assert self.__stop_otel_jaeger()
def test_slice_trace_simple(self):
self.__clear_trace_file()
assert self.__launch_otel_jaeger()
try:
process_tracing_init("0.0.0.0:4317", "test")
trace_set_thread_info("Test")
trace_req_start(0)
trace_slice_start("test slice", 0)
time.sleep(1)
trace_slice_end("test slice", 0)
trace_req_finish(0)
# sleep for a few seconds to wait for opentelemetry collector to asynchronously export data to file.
time.sleep(10)
# check trace file
assert os.path.isfile("/tmp/otel_trace.json"), "trace file not exist"
assert os.path.getsize("/tmp/otel_trace.json") > 0, "trace file is empty"
finally:
assert self.__stop_otel_jaeger()
def test_slice_trace_complex(self):
self.__clear_trace_file()
assert self.__launch_otel_jaeger()
try:
process_tracing_init("0.0.0.0:4317", "test")
trace_set_thread_info("Test")
trace_req_start(0)
trace_slice_start("", 0, anonymous=True)
time.sleep(1)
trace_slice_end("slice A", 0, auto_next_anon=True)
time.sleep(1)
trace_slice_end("slice B", 0, auto_next_anon=True)
time.sleep(1)
trace_slice_end("slice C", 0, thread_finish_flag=True)
trace_req_finish(0)
# sleep for a few seconds to wait for opentelemetry collector to asynchronously export data to file.
time.sleep(10)
# check trace file
assert os.path.isfile("/tmp/otel_trace.json"), "trace file not exist"
assert os.path.getsize("/tmp/otel_trace.json") > 0, "trace file is empty"
finally:
assert self.__stop_otel_jaeger()
def test_trace_context_propagete(self):
def __process_work():
process_tracing_init("0.0.0.0:4317", "test")
trace_set_thread_info("Sub Process")
context = zmq.Context(2)
recv_from_main = get_zmq_socket(
context, zmq.PULL, "ipc:///tmp/zmq_test.ipc", True
)
try:
req = recv_from_main.recv_pyobj()
trace_set_proc_propagate_context(req.rid, req.trace_context)
trace_slice_start("work", req.rid)
time.sleep(1)
trace_slice_end("work", req.rid, thread_finish_flag=True)
finally:
recv_from_main.close()
context.term()
self.__clear_trace_file()
assert self.__launch_otel_jaeger()
context = zmq.Context(2)
send_to_subproc = get_zmq_socket(
context, zmq.PUSH, "ipc:///tmp/zmq_test.ipc", False
)
try:
process_tracing_init("0.0.0.0:4317", "test")
trace_set_thread_info("Main Process")
subproc = mp.Process(target=__process_work)
subproc.start()
# sleep for a few second to ensure subprocess init
time.sleep(1)
req = Req(rid=0)
trace_req_start(req.rid)
trace_slice_start("dispatch", req.rid)
time.sleep(1)
req.trace_context = trace_get_proc_propagate_context(req.rid)
send_to_subproc.send_pyobj(req)
trace_slice_end("dispatch", req.rid)
subproc.join()
trace_req_finish(req.rid)
# sleep for a few seconds to wait for opentelemetry collector to asynchronously export data to file.
time.sleep(10)
# check trace file
assert os.path.isfile("/tmp/otel_trace.json"), "trace file not exist"
assert os.path.getsize("/tmp/otel_trace.json") > 0, "trace file is empty"
finally:
send_to_subproc.close()
context.term()
assert self.__stop_otel_jaeger()
if __name__ == "__main__":
unittest.main()