Files
sglang/python/sglang/test/test_utils.py
2025-10-12 06:46:36 -07:00

1805 lines
54 KiB
Python

"""Common utilities for testing and benchmarking"""
import argparse
import asyncio
import copy
import json
import logging
import os
import random
import re
import subprocess
import sys
import threading
import time
import unittest
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from datetime import datetime
from functools import partial
from pathlib import Path
from types import SimpleNamespace
from typing import Any, Awaitable, Callable, List, Optional, Tuple
import aiohttp
import numpy as np
import requests
import torch
import torch.nn.functional as F
from sglang.bench_serving import run_benchmark
from sglang.global_config import global_config
from sglang.srt.utils import (
get_bool_env_var,
get_device,
is_port_available,
kill_process_tree,
retry,
)
from sglang.test.run_eval import run_eval
from sglang.utils import get_exception_traceback
# General test models
DEFAULT_MODEL_NAME_FOR_TEST = "meta-llama/Llama-3.1-8B-Instruct"
DEFAULT_SMALL_MODEL_NAME_FOR_TEST = "meta-llama/Llama-3.2-1B-Instruct"
DEFAULT_SMALL_MODEL_NAME_FOR_TEST_BASE = "meta-llama/Llama-3.2-1B"
DEFAULT_SMALL_MODEL_NAME_FOR_TEST_SCORE = "Qwen/Qwen3-Reranker-0.6B"
DEFAULT_MOE_MODEL_NAME_FOR_TEST = "mistralai/Mixtral-8x7B-Instruct-v0.1"
DEFAULT_SMALL_MOE_MODEL_NAME_FOR_TEST_BASE = "Qwen/Qwen1.5-MoE-A2.7B"
DEFAULT_SMALL_MOE_MODEL_NAME_FOR_TEST_CHAT = "Qwen/Qwen1.5-MoE-A2.7B-Chat"
# MLA test models
DEFAULT_SMALL_EMBEDDING_MODEL_NAME_FOR_TEST = "Alibaba-NLP/gte-Qwen2-1.5B-instruct"
DEFAULT_SMALL_CROSS_ENCODER_MODEL_NAME_FOR_TEST = "cross-encoder/ms-marco-MiniLM-L6-v2"
DEFAULT_MLA_MODEL_NAME_FOR_TEST = "deepseek-ai/DeepSeek-Coder-V2-Lite-Instruct"
DEFAULT_MLA_FP8_MODEL_NAME_FOR_TEST = "neuralmagic/DeepSeek-Coder-V2-Lite-Instruct-FP8"
DEFAULT_MODEL_NAME_FOR_TEST_MLA = "lmsys/sglang-ci-dsv3-test"
DEFAULT_MODEL_NAME_FOR_TEST_MLA_NEXTN = "lmsys/sglang-ci-dsv3-test-NextN"
# NVFP4 models
DEFAULT_DEEPSEEK_NVFP4_MODEL_FOR_TEST = "nvidia/DeepSeek-R1-0528-FP4"
# FP8 models
DEFAULT_MODEL_NAME_FOR_TEST_FP8 = "neuralmagic/Meta-Llama-3.1-8B-Instruct-FP8"
DEFAULT_MODEL_NAME_FOR_ACCURACY_TEST_FP8 = "neuralmagic/Meta-Llama-3.1-8B-Instruct-FP8"
DEFAULT_MODEL_NAME_FOR_DYNAMIC_QUANT_ACCURACY_TEST_FP8 = (
"neuralmagic/Meta-Llama-3.1-8B-Instruct-FP8-dynamic"
)
DEFAULT_MODEL_NAME_FOR_MODELOPT_QUANT_ACCURACY_TEST_FP8 = (
"nvidia/Llama-3.1-8B-Instruct-FP8"
)
DEFAULT_MODEL_NAME_FOR_TEST_QWEN_FP8 = "Qwen/Qwen3-1.7B-FP8"
DEFAULT_MODEL_NAME_FOR_TEST_FP8_WITH_MOE = "gaunernst/DeepSeek-V2-Lite-Chat-FP8"
# W8A8 models
DEFAULT_MODEL_NAME_FOR_TEST_W8A8 = "RedHatAI/Llama-3.2-3B-quantized.w8a8"
DEFAULT_MODEL_NAME_FOR_TEST_W8A8_WITH_MOE = "nytopop/Qwen3-30B-A3B.w8a8"
# EAGLE
DEFAULT_EAGLE_TARGET_MODEL_FOR_TEST = "meta-llama/Llama-2-7b-chat-hf"
DEFAULT_EAGLE_DRAFT_MODEL_FOR_TEST = "lmsys/sglang-EAGLE-llama2-chat-7B"
DEFAULT_EAGLE_TARGET_MODEL_FOR_TEST_EAGLE3 = "meta-llama/Llama-3.1-8B-Instruct"
DEFAULT_MODEL_NAME_FOR_TEST_EAGLE3 = "lmsys/sglang-EAGLE3-LLaMA3.1-Instruct-8B"
DEFAULT_STANDALONE_SPECULATIVE_TARGET_MODEL_FOR_TEST = (
"meta-llama/Llama-3.1-8B-Instruct"
)
DEFAULT_STANDALONE_SPECULATIVE_DRAFT_MODEL_FOR_TEST = "meta-llama/Llama-3.2-1B-Instruct"
DEFAULT_NGRAM_SPECULATIVE_TARGET_MODEL_FOR_TEST = "Qwen/Qwen2.5-Coder-7B-Instruct"
# Other use cases
DEFAULT_MODEL_NAME_FOR_TEST_LOCAL_ATTENTION = (
"meta-llama/Llama-4-Scout-17B-16E-Instruct"
)
DEFAULT_SMALL_EMBEDDING_MODEL_NAME_FOR_TEST = "Alibaba-NLP/gte-Qwen2-1.5B-instruct"
DEFAULT_REASONING_MODEL_NAME_FOR_TEST = "deepseek-ai/DeepSeek-R1-Distill-Qwen-7B"
DEFAULT_DEEPPEP_MODEL_NAME_FOR_TEST = "deepseek-ai/DeepSeek-V3-0324"
DEFAULT_AWQ_MOE_MODEL_NAME_FOR_TEST = (
"hugging-quants/Mixtral-8x7B-Instruct-v0.1-AWQ-INT4"
)
DEFAULT_ENABLE_THINKING_MODEL_NAME_FOR_TEST = "Qwen/Qwen3-30B-A3B"
DEFAULT_DEEPSEEK_W4AFP8_MODEL_FOR_TEST = "Barrrrry/DeepSeek-R1-W4AFP8"
# Nightly tests
DEFAULT_MODEL_NAME_FOR_NIGHTLY_EVAL_TP1 = "meta-llama/Llama-3.1-8B-Instruct,mistralai/Mistral-7B-Instruct-v0.3,deepseek-ai/DeepSeek-Coder-V2-Lite-Instruct,google/gemma-2-27b-it"
DEFAULT_MODEL_NAME_FOR_NIGHTLY_EVAL_TP2 = "meta-llama/Llama-3.1-70B-Instruct,mistralai/Mixtral-8x7B-Instruct-v0.1,Qwen/Qwen2-57B-A14B-Instruct"
DEFAULT_MODEL_NAME_FOR_NIGHTLY_EVAL_FP8_TP1 = "neuralmagic/Meta-Llama-3.1-8B-Instruct-FP8,neuralmagic/Mistral-7B-Instruct-v0.3-FP8,neuralmagic/DeepSeek-Coder-V2-Lite-Instruct-FP8,neuralmagic/gemma-2-2b-it-FP8"
DEFAULT_MODEL_NAME_FOR_NIGHTLY_EVAL_FP8_TP2 = "neuralmagic/Meta-Llama-3.1-70B-Instruct-FP8,neuralmagic/Mixtral-8x7B-Instruct-v0.1-FP8,neuralmagic/Qwen2-72B-Instruct-FP8,neuralmagic/Qwen2-57B-A14B-Instruct-FP8,neuralmagic/DeepSeek-Coder-V2-Lite-Instruct-FP8,zai-org/GLM-4.5-Air-FP8"
DEFAULT_MODEL_NAME_FOR_NIGHTLY_EVAL_QUANT_TP1 = "hugging-quants/Meta-Llama-3.1-8B-Instruct-AWQ-INT4,hugging-quants/Meta-Llama-3.1-8B-Instruct-GPTQ-INT4,hugging-quants/Mixtral-8x7B-Instruct-v0.1-AWQ-INT4"
DEFAULT_SMALL_MODEL_NAME_FOR_TEST_QWEN = "Qwen/Qwen2.5-1.5B-Instruct"
DEFAULT_SMALL_VLM_MODEL_NAME_FOR_TEST = "Qwen/Qwen2.5-VL-3B-Instruct"
DEFAULT_IMAGE_URL = "https://github.com/sgl-project/sglang/blob/main/test/lang/example_image.png?raw=true"
DEFAULT_VIDEO_URL = "https://raw.githubusercontent.com/EvolvingLMMs-Lab/sglang/dev/onevision_local/assets/jobs.mp4"
DEFAULT_TIMEOUT_FOR_SERVER_LAUNCH = 600
def is_in_ci():
"""Return whether it is in CI runner."""
return get_bool_env_var("SGLANG_IS_IN_CI")
def is_in_amd_ci():
"""Return whether it is in an AMD CI runner."""
return get_bool_env_var("SGLANG_AMD_CI")
def _use_cached_default_models(model_repo: str):
cache_dir = os.getenv("DEFAULT_MODEL_CACHE_DIR")
if cache_dir and model_repo:
model_path = os.path.join(cache_dir, model_repo)
if os.path.isdir(model_path):
return os.path.abspath(model_path)
return ""
if is_in_ci():
DEFAULT_PORT_FOR_SRT_TEST_RUNNER = (
10000 + int(os.environ.get("CUDA_VISIBLE_DEVICES", "0")[0]) * 1000
)
else:
DEFAULT_PORT_FOR_SRT_TEST_RUNNER = (
20000 + int(os.environ.get("CUDA_VISIBLE_DEVICES", "0")[0]) * 1000
)
DEFAULT_URL_FOR_TEST = f"http://127.0.0.1:{DEFAULT_PORT_FOR_SRT_TEST_RUNNER + 1000}"
if is_in_amd_ci():
DEFAULT_TIMEOUT_FOR_SERVER_LAUNCH = 3000
def call_generate_lightllm(prompt, temperature, max_tokens, stop=None, url=None):
assert url is not None
data = {
"inputs": prompt,
"parameters": {
"temperature": temperature,
"max_new_tokens": max_tokens,
"stop_sequences": stop,
},
}
res = requests.post(url, json=data)
assert res.status_code == 200
pred = res.json()["generated_text"][0]
return pred
def find_available_port(base_port: int):
port = base_port + random.randint(100, 1000)
while True:
if is_port_available(port):
return port
if port < 60000:
port += 42
else:
port -= 43
def call_generate_vllm(prompt, temperature, max_tokens, stop=None, n=1, url=None):
assert url is not None
data = {
"prompt": prompt,
"temperature": temperature,
"max_tokens": max_tokens,
"stop": stop,
"n": n,
}
res = requests.post(url, json=data)
assert res.status_code == 200
if n == 1:
pred = res.json()["text"][0][len(prompt) :]
else:
pred = [x[len(prompt) :] for x in res.json()["text"]]
return pred
def call_generate_outlines(
prompt, temperature, max_tokens, stop=None, regex=None, n=1, url=None
):
assert url is not None
data = {
"prompt": prompt,
"temperature": temperature,
"max_tokens": max_tokens,
"stop": stop,
"regex": regex,
"n": n,
}
res = requests.post(url, json=data)
assert res.status_code == 200
if n == 1:
pred = res.json()["text"][0][len(prompt) :]
else:
pred = [x[len(prompt) :] for x in res.json()["text"]]
return pred
def call_generate_srt_raw(prompt, temperature, max_tokens, stop=None, url=None):
assert url is not None
data = {
"text": prompt,
"sampling_params": {
"temperature": temperature,
"max_new_tokens": max_tokens,
"stop": stop,
},
}
res = requests.post(url, json=data)
assert res.status_code == 200
obj = res.json()
pred = obj["text"]
return pred
def call_generate_guidance(
prompt, temperature, max_tokens, stop=None, n=1, regex=None, model=None
):
assert model is not None
from guidance import gen
rets = []
for _ in range(n):
out = (
model
+ prompt
+ gen(
name="answer",
max_tokens=max_tokens,
temperature=temperature,
stop=stop,
regex=regex,
)
)
rets.append(out["answer"])
return rets if n > 1 else rets[0]
def call_select_lightllm(context, choices, url=None):
assert url is not None
scores = []
for i in range(len(choices)):
data = {
"inputs": context + choices[i],
"parameters": {
"max_new_tokens": 1,
},
}
res = requests.post(url, json=data)
assert res.status_code == 200
scores.append(0)
return np.argmax(scores)
def call_select_vllm(context, choices, url=None):
assert url is not None
scores = []
for i in range(len(choices)):
data = {
"prompt": context + choices[i],
"max_tokens": 1,
"prompt_logprobs": 1,
}
res = requests.post(url, json=data)
assert res.status_code == 200
scores.append(res.json().get("prompt_score", 0))
return np.argmax(scores)
"""
Modify vllm/entrypoints/api_server.py
if final_output.prompt_logprobs is not None:
score = np.mean([prob[t_id] for t_id, prob in zip(final_output.prompt_token_ids[1:], final_output.prompt_logprobs[1:])])
ret["prompt_score"] = score
"""
def call_select_guidance(context, choices, model=None):
assert model is not None
from guidance import select
out = model + context + select(choices, name="answer")
return choices.index(out["answer"])
def add_common_other_args_and_parse(parser: argparse.ArgumentParser):
parser.add_argument("--parallel", type=int, default=64)
parser.add_argument("--host", type=str, default="http://127.0.0.1")
parser.add_argument("--port", type=int, default=None)
parser.add_argument(
"--backend",
type=str,
required=True,
choices=[
"vllm",
"outlines",
"lightllm",
"gserver",
"guidance",
"srt-raw",
"llama.cpp",
],
)
parser.add_argument("--n-ctx", type=int, default=4096)
parser.add_argument(
"--model-path", type=str, default="meta-llama/Llama-2-7b-chat-hf"
)
parser.add_argument("--result-file", type=str, default="result.jsonl")
args = parser.parse_args()
if args.port is None:
default_port = {
"vllm": 21000,
"outlines": 21000,
"lightllm": 22000,
"srt-raw": 30000,
"gserver": 9988,
}
args.port = default_port.get(args.backend, None)
return args
def auto_config_device() -> str:
"""Auto-config available device platform"""
try:
device = get_device()
except (RuntimeError, ImportError) as e:
print(f"Warning: {e} - Falling back to CPU")
device = "cpu"
return device
def add_common_sglang_args_and_parse(parser: argparse.ArgumentParser):
parser.add_argument("--parallel", type=int, default=64)
parser.add_argument("--host", type=str, default="http://127.0.0.1")
parser.add_argument("--port", type=int, default=30000)
parser.add_argument("--backend", type=str, default="srt")
parser.add_argument(
"--device",
type=str,
default="auto",
choices=["auto", "cuda", "rocm", "cpu"],
help="Device type (auto/cuda/rocm/cpu). Auto will detect available platforms",
)
parser.add_argument("--result-file", type=str, default="result.jsonl")
parser.add_argument("--raw-result-file", type=str)
args = parser.parse_args()
return args
def select_sglang_backend(args: argparse.Namespace):
from sglang.lang.backend.openai import OpenAI
from sglang.lang.backend.runtime_endpoint import RuntimeEndpoint
if args.backend.startswith("srt"):
if args.backend == "srt-no-parallel":
global_config.enable_parallel_encoding = False
backend = RuntimeEndpoint(f"{args.host}:{args.port}")
elif args.backend.startswith("gpt-"):
backend = OpenAI(args.backend)
else:
raise ValueError(f"Invalid backend: {args.backend}")
return backend
def _get_call_generate(args: argparse.Namespace):
if args.backend == "lightllm":
return partial(call_generate_lightllm, url=f"{args.host}:{args.port}/generate")
elif args.backend == "vllm":
return partial(call_generate_vllm, url=f"{args.host}:{args.port}/generate")
elif args.backend == "srt-raw":
return partial(call_generate_srt_raw, url=f"{args.host}:{args.port}/generate")
elif args.backend == "outlines":
return partial(call_generate_outlines, url=f"{args.host}:{args.port}/generate")
elif args.backend == "guidance":
from guidance import models
model = models.LlamaCpp(args.model_path, n_gpu_layers=-1, n_ctx=args.n_ctx)
call_generate = partial(call_generate_guidance, model=model)
call_generate("Hello,", 1.0, 8, ".")
return call_generate
else:
raise ValueError(f"Invalid backend: {args.backend}")
def _get_call_select(args: argparse.Namespace):
if args.backend == "lightllm":
return partial(call_select_lightllm, url=f"{args.host}:{args.port}/generate")
elif args.backend == "vllm":
return partial(call_select_vllm, url=f"{args.host}:{args.port}/generate")
elif args.backend == "guidance":
from guidance import models
model = models.LlamaCpp(args.model_path, n_gpu_layers=-1, n_ctx=args.n_ctx)
call_select = partial(call_select_guidance, model=model)
call_select("Hello,", ["world", "earth"])
return call_select
else:
raise ValueError(f"Invalid backend: {args.backend}")
def get_call_generate(args: argparse.Namespace):
call_generate = _get_call_generate(args)
def func(*args, **kwargs):
try:
return call_generate(*args, **kwargs)
except Exception:
print("Exception in call_generate:\n" + get_exception_traceback())
raise
return func
def get_call_select(args: argparse.Namespace):
call_select = _get_call_select(args)
def func(*args, **kwargs):
try:
return call_select(*args, **kwargs)
except Exception:
print("Exception in call_select:\n" + get_exception_traceback())
raise
return func
def _get_default_models():
import inspect
current_module = inspect.getmodule(_get_default_models)
default_models = set()
for name, value in current_module.__dict__.items():
if (
isinstance(name, str)
and "DEFAULT_" in name
and "MODEL_" in name
and isinstance(value, str)
):
if "," in value:
parts = [part.strip() for part in value.split(",")]
default_models.update(parts)
else:
default_models.add(value.strip())
return json.dumps(list(default_models))
def try_cached_model(model_repo: str):
model_dir = _use_cached_default_models(model_repo)
return model_dir if model_dir else model_repo
def popen_with_error_check(command: list[str], allow_exit: bool = False):
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
def _run_and_check():
stdout, stderr = process.communicate()
while process.poll() is None:
time.sleep(5)
if not allow_exit or process.returncode != 0:
raise Exception(
f"{command} exited with code {process.returncode}\n{stdout=}\n{stderr=}"
)
t = threading.Thread(target=_run_and_check)
t.start()
return process
def popen_launch_server(
model: str,
base_url: str,
timeout: float,
api_key: Optional[str] = None,
other_args: Optional[list[str]] = None,
env: Optional[dict] = None,
return_stdout_stderr: Optional[tuple] = None,
device: str = "auto",
pd_separated: bool = False,
num_replicas: Optional[int] = None,
):
"""Launch a server process with automatic device detection.
Args:
device: Device type ("auto", "cuda", "rocm" or "cpu").
If "auto", will detect available platforms automatically.
"""
other_args = other_args or []
# Auto-detect device if needed
if device == "auto":
device = auto_config_device()
other_args = list(other_args)
other_args += ["--device", str(device)]
_, host, port = base_url.split(":")
host = host[2:]
use_mixed_pd_engine = not pd_separated and num_replicas is not None
if pd_separated or use_mixed_pd_engine:
command = "sglang.launch_pd_server"
else:
command = "sglang.launch_server"
command = [
"python3",
"-m",
command,
"--model-path",
model,
*[str(x) for x in other_args],
]
if pd_separated or use_mixed_pd_engine:
command.extend(
[
"--lb-host",
host,
"--lb-port",
port,
]
)
else:
command.extend(
[
"--host",
host,
"--port",
port,
]
)
if use_mixed_pd_engine:
command.extend(
[
"--mixed",
"--num-replicas",
str(num_replicas),
]
)
if api_key:
command += ["--api-key", api_key]
print(f"command={' '.join(command)}")
if return_stdout_stderr:
process = subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env,
text=True,
bufsize=1,
)
def _dump(src, sinks):
for line in iter(src.readline, ""):
for sink in sinks:
sink.write(line)
sink.flush()
src.close()
threading.Thread(
target=_dump,
args=(process.stdout, [return_stdout_stderr[0], sys.stdout]),
daemon=True,
).start()
threading.Thread(
target=_dump,
args=(process.stderr, [return_stdout_stderr[1], sys.stderr]),
daemon=True,
).start()
else:
process = subprocess.Popen(command, stdout=None, stderr=None, env=env)
start_time = time.perf_counter()
with requests.Session() as session:
while time.perf_counter() - start_time < timeout:
return_code = process.poll()
if return_code is not None:
# Server failed to start (non-zero exit code) or crashed
raise Exception(
f"Server process exited with code {return_code}. "
"Check server logs for errors."
)
try:
headers = {
"Content-Type": "application/json; charset=utf-8",
"Authorization": f"Bearer {api_key}",
}
response = session.get(
f"{base_url}/health_generate",
headers=headers,
)
if response.status_code == 200:
return process
except requests.RequestException:
pass
return_code = process.poll()
if return_code is not None:
raise Exception(
f"Server unexpectedly exits ({return_code=}). Usually there will be error logs describing the cause far above this line."
)
time.sleep(10)
kill_process_tree(process.pid)
raise TimeoutError("Server failed to start within the timeout period.")
def popen_launch_pd_server(
model: str,
base_url: str,
timeout: float,
api_key: Optional[str] = None,
other_args: list[str] = (),
env: Optional[dict] = None,
):
_, host, port = base_url.split(":")
host = host[2:]
command = "sglang.launch_server"
command = [
"python3",
"-m",
command,
"--model-path",
model,
*[str(x) for x in other_args],
]
command.extend(
[
"--host",
host,
"--port",
port,
]
)
if api_key:
command += ["--api-key", api_key]
print(f"command={' '.join(command)}")
process = subprocess.Popen(command, stdout=None, stderr=None, env=env)
return process
def run_with_timeout(
func: Callable,
args: tuple = (),
kwargs: Optional[dict] = None,
timeout: float = None,
):
"""Run a function with timeout."""
ret_value = []
def _target_func():
ret_value.append(func(*args, **(kwargs or {})))
t = threading.Thread(target=_target_func)
t.start()
t.join(timeout=timeout)
if t.is_alive():
raise TimeoutError()
if not ret_value:
raise RuntimeError()
return ret_value[0]
@dataclass
class TestFile:
name: str
estimated_time: float = 60
def run_unittest_files(files: List[TestFile], timeout_per_file: float):
tic = time.perf_counter()
success = True
for i, file in enumerate(files):
filename, estimated_time = file.name, file.estimated_time
process = None
def run_one_file(filename):
nonlocal process
filename = os.path.join(os.getcwd(), filename)
print(
f".\n.\nBegin ({i}/{len(files) - 1}):\npython3 {filename}\n.\n.\n",
flush=True,
)
tic = time.perf_counter()
process = subprocess.Popen(
["python3", filename], stdout=None, stderr=None, env=os.environ
)
process.wait()
elapsed = time.perf_counter() - tic
print(
f".\n.\nEnd ({i}/{len(files) - 1}):\n{filename=}, {elapsed=:.0f}, {estimated_time=}\n.\n.\n",
flush=True,
)
return process.returncode
try:
ret_code = run_with_timeout(
run_one_file, args=(filename,), timeout=timeout_per_file
)
assert (
ret_code == 0
), f"expected return code 0, but {filename} returned {ret_code}"
except TimeoutError:
kill_process_tree(process.pid)
time.sleep(5)
print(
f"\nTimeout after {timeout_per_file} seconds when running {filename}\n",
flush=True,
)
success = False
break
if success:
print(f"Success. Time elapsed: {time.perf_counter() - tic:.2f}s", flush=True)
else:
print(f"Fail. Time elapsed: {time.perf_counter() - tic:.2f}s", flush=True)
return 0 if success else -1
def get_similarities(vec1, vec2):
return F.cosine_similarity(torch.tensor(vec1), torch.tensor(vec2), dim=0)
def get_benchmark_args(
base_url="",
dataset_name="",
dataset_path="",
tokenizer="",
num_prompts=500,
sharegpt_output_len=None,
random_input_len=4096,
random_output_len=2048,
sharegpt_context_len=None,
request_rate=float("inf"),
disable_stream=False,
disable_ignore_eos=False,
seed: int = 0,
device="auto",
pd_separated: bool = False,
lora_name=None,
):
return SimpleNamespace(
backend="sglang",
base_url=base_url,
host=None,
port=None,
dataset_name=dataset_name,
dataset_path=dataset_path,
model=None,
tokenizer=tokenizer,
num_prompts=num_prompts,
sharegpt_output_len=sharegpt_output_len,
sharegpt_context_len=sharegpt_context_len,
random_input_len=random_input_len,
random_output_len=random_output_len,
random_range_ratio=0.0,
request_rate=request_rate,
multi=None,
output_file=None,
disable_tqdm=False,
disable_stream=disable_stream,
return_logprob=False,
seed=seed,
disable_ignore_eos=disable_ignore_eos,
extra_request_body=None,
apply_chat_template=False,
profile=None,
lora_name=lora_name,
prompt_suffix="",
device=device,
pd_separated=pd_separated,
)
def run_bench_serving(
model,
num_prompts,
request_rate,
other_server_args,
dataset_name="random",
dataset_path="",
tokenizer=None,
random_input_len=4096,
random_output_len=2048,
sharegpt_context_len=None,
disable_stream=False,
disable_ignore_eos=False,
need_warmup=False,
seed: int = 0,
device="auto",
background_task: Optional[Callable[[str, asyncio.Event], Awaitable[None]]] = None,
lora_name: Optional[str] = None,
):
if device == "auto":
device = auto_config_device()
# Launch the server
base_url = DEFAULT_URL_FOR_TEST
process = popen_launch_server(
model,
base_url,
timeout=DEFAULT_TIMEOUT_FOR_SERVER_LAUNCH,
other_args=other_server_args,
)
# Run benchmark
args = get_benchmark_args(
base_url=base_url,
dataset_name=dataset_name,
dataset_path=dataset_path,
tokenizer=tokenizer,
num_prompts=num_prompts,
random_input_len=random_input_len,
random_output_len=random_output_len,
sharegpt_context_len=sharegpt_context_len,
request_rate=request_rate,
disable_stream=disable_stream,
disable_ignore_eos=disable_ignore_eos,
seed=seed,
device=device,
lora_name=lora_name,
)
async def _run():
if need_warmup:
warmup_args = copy.deepcopy(args)
warmup_args.num_prompts = 16
await asyncio.to_thread(run_benchmark, warmup_args)
start_event = asyncio.Event()
stop_event = asyncio.Event()
task_handle = (
asyncio.create_task(background_task(base_url, start_event, stop_event))
if background_task
else None
)
try:
start_event.set()
result = await asyncio.to_thread(run_benchmark, args)
finally:
if task_handle:
stop_event.set()
await task_handle
return result
try:
res = asyncio.run(_run())
finally:
kill_process_tree(process.pid)
assert res["completed"] == num_prompts
return res
def run_score_benchmark(
model,
num_requests=100,
batch_size=5,
other_server_args=None,
need_warmup=False,
device="auto",
):
"""Score API benchmark function compatible with run_bench_serving pattern"""
if other_server_args is None:
other_server_args = []
if device == "auto":
device = auto_config_device()
# Launch the server (consistent with run_bench_serving)
base_url = DEFAULT_URL_FOR_TEST
process = popen_launch_server(
model,
base_url,
timeout=DEFAULT_TIMEOUT_FOR_SERVER_LAUNCH,
other_args=other_server_args,
)
async def _run_benchmark():
# Load tokenizer for generating test data
from sglang.srt.utils.hf_transformers_utils import get_tokenizer
tokenizer = get_tokenizer(model)
# Score API configuration
score_query_tokens = 120
score_item_tokens = 180
score_label_token_ids = [9454, 2753] # Yes/No token IDs
special_token = "<|im_start|>"
def generate_text_with_token_count(num_tokens):
"""Generate text with precise token count using replicated token."""
text = special_token * num_tokens
actual_tokens = len(tokenizer.encode(text, add_special_tokens=False))
if actual_tokens != num_tokens:
text = special_token * (
num_tokens
// len(tokenizer.encode(special_token, add_special_tokens=False))
)
return text
if need_warmup:
warmup_data = {
"query": generate_text_with_token_count(score_query_tokens),
"items": [
generate_text_with_token_count(score_item_tokens) for _ in range(3)
],
"label_token_ids": score_label_token_ids,
"model": model,
"apply_softmax": True,
}
async with aiohttp.ClientSession() as session:
try:
await session.post(
f"{base_url}/v1/score",
json=warmup_data,
timeout=aiohttp.ClientTimeout(total=30),
)
except:
pass # Ignore warmup errors
test_requests = []
for i in range(num_requests):
query = generate_text_with_token_count(score_query_tokens)
items = [
generate_text_with_token_count(score_item_tokens)
for _ in range(batch_size)
]
score_data = {
"query": query,
"items": items,
"label_token_ids": score_label_token_ids,
"model": model,
"apply_softmax": True,
}
test_requests.append(score_data)
start_time = time.monotonic()
successful_requests = 0
total_latency = 0
latencies = []
async with aiohttp.ClientSession() as session:
for request_data in test_requests:
try:
request_start = time.monotonic()
async with session.post(
f"{base_url}/v1/score",
json=request_data,
timeout=aiohttp.ClientTimeout(total=30),
) as response:
if response.status == 200:
response_data = await response.json()
request_end = time.monotonic()
if "scores" in response_data or "logprobs" in response_data:
latency_ms = (request_end - request_start) * 1000
latencies.append(latency_ms)
total_latency += latency_ms
successful_requests += 1
except Exception:
continue
end_time = time.monotonic()
total_time = end_time - start_time
if successful_requests > 0:
throughput = successful_requests / total_time
avg_latency = total_latency / successful_requests
latencies.sort()
p95_latency = latencies[int(len(latencies) * 0.95)] if latencies else 0
return {
"completed": successful_requests,
"total_requests": num_requests,
"throughput": throughput,
"avg_latency_ms": avg_latency,
"p95_latency_ms": p95_latency,
"successful_requests": successful_requests,
}
else:
return {
"completed": 0,
"total_requests": num_requests,
"throughput": 0,
"avg_latency_ms": 0,
"p95_latency_ms": 0,
"successful_requests": 0,
}
try:
res = asyncio.run(_run_benchmark())
finally:
kill_process_tree(process.pid)
assert res["completed"] == res["successful_requests"]
return res
def run_bench_serving_multi(
model,
base_url,
other_server_args,
benchmark_args,
need_warmup=False,
pd_separated=False,
):
# Launch the server
process = popen_launch_server(
model,
base_url,
timeout=DEFAULT_TIMEOUT_FOR_SERVER_LAUNCH,
other_args=other_server_args,
pd_separated=pd_separated,
)
# run benchmark for all
res_l = []
try:
for args in benchmark_args:
if need_warmup:
warmup_args = copy.deepcopy(args)
warmup_args.num_prompts = 16
run_benchmark(warmup_args)
res = run_benchmark(args)
res_l.append((args, res))
finally:
kill_process_tree(process.pid)
return res_l
def run_bench_one_batch(model, other_args):
"""Launch a offline process with automatic device detection.
Args:
device: Device type ("auto", "cuda", "rocm" or "cpu").
If "auto", will detect available platforms automatically.
"""
# Auto-detect device if needed
device = auto_config_device()
print(f"Auto-configed device: {device}", flush=True)
other_args += ["--device", str(device)]
command = [
"python3",
"-m",
"sglang.bench_one_batch",
"--batch-size",
"1",
"--input",
"128",
"--output",
"8",
*[str(x) for x in other_args],
]
if model is not None:
command += ["--model-path", model]
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
try:
stdout, stderr = process.communicate()
output = stdout.decode()
error = stderr.decode()
print(f"Output: {output}", flush=True)
print(f"Error: {error}", flush=True)
# Return prefill_latency, decode_throughput, decode_latency
prefill_line = output.split("\n")[-9]
decode_line = output.split("\n")[-3]
pattern = (
r"latency: (?P<latency>\d+\.\d+).*?throughput:\s*(?P<throughput>\d+\.\d+)"
)
match = re.search(pattern, prefill_line)
if match:
prefill_latency = float(match.group("latency"))
match = re.search(pattern, decode_line)
if match:
decode_latency = float(match.group("latency"))
decode_throughput = float(match.group("throughput"))
finally:
kill_process_tree(process.pid)
return prefill_latency, decode_throughput, decode_latency
def run_bench_offline_throughput(model, other_args):
command = [
"python3",
"-m",
"sglang.bench_offline_throughput",
"--num-prompts",
"1",
"--dataset-name",
"random",
"--random-input-len",
"256",
"--random-output-len",
"256",
"--model-path",
model,
*[str(x) for x in other_args],
]
print(f"command={' '.join(command)}")
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
try:
stdout, stderr = process.communicate()
output = stdout.decode()
error = stderr.decode()
print(f"Output: {output}", flush=True)
print(f"Error: {error}", flush=True)
output_throughput = -1
for line in output.split("\n"):
if "Last generation throughput (tok/s):" in line:
output_throughput = float(line.split(":")[-1])
finally:
kill_process_tree(process.pid)
return output_throughput
def run_bench_one_batch_server(
model,
base_url,
server_args,
bench_args,
other_server_args,
simulate_spec_acc_lens=None,
):
from sglang.bench_one_batch_server import run_benchmark
if simulate_spec_acc_lens is not None:
env = {**os.environ, "SIMULATE_ACC_LEN": str(simulate_spec_acc_lens)}
else:
env = None
process = popen_launch_server(
model,
base_url,
timeout=DEFAULT_TIMEOUT_FOR_SERVER_LAUNCH,
other_args=other_server_args,
env=env,
)
try:
run_benchmark(server_args=server_args, bench_args=bench_args)
finally:
kill_process_tree(process.pid)
def lcs(X, Y):
m = len(X)
n = len(Y)
L = [[0] * (n + 1) for _ in range(m + 1)]
for i in range(m + 1):
for j in range(n + 1):
if i == 0 or j == 0:
L[i][j] = 0
elif X[i - 1] == Y[j - 1]:
L[i][j] = L[i - 1][j - 1] + 1
else:
L[i][j] = max(L[i - 1][j], L[i][j - 1])
return L[m][n]
def calculate_rouge_l(output_strs_list1, output_strs_list2):
"""calculate the ROUGE-L score"""
rouge_l_scores = []
for s1, s2 in zip(output_strs_list1, output_strs_list2):
lcs_len = lcs(s1, s2)
precision = lcs_len / len(s1) if len(s1) > 0 else 0
recall = lcs_len / len(s2) if len(s2) > 0 else 0
if precision + recall > 0:
fmeasure = (2 * precision * recall) / (precision + recall)
else:
fmeasure = 0.0
rouge_l_scores.append(fmeasure)
return rouge_l_scores
STDERR_FILENAME = "/tmp/stderr.txt"
STDOUT_FILENAME = "/tmp/stdout.txt"
def read_output(output_lines: List[str], filename: str = STDERR_FILENAME):
"""Print the output in real time with another thread."""
while not os.path.exists(filename):
time.sleep(0.01)
pt = 0
while pt >= 0:
if pt > 0 and not os.path.exists(filename):
break
try:
lines = open(filename).readlines()
except FileNotFoundError:
print(f"{pt=}, {os.path.exists(filename)=}")
raise
for line in lines[pt:]:
print(line, end="", flush=True)
output_lines.append(line)
pt += 1
time.sleep(0.1)
def run_and_check_memory_leak(
workload_func,
disable_radix_cache,
enable_mixed_chunk,
disable_overlap,
chunked_prefill_size,
assert_has_abort,
):
other_args = [
"--chunked-prefill-size",
str(chunked_prefill_size),
"--log-level",
"debug",
]
if disable_radix_cache:
other_args += ["--disable-radix-cache"]
if enable_mixed_chunk:
other_args += ["--enable-mixed-chunk"]
if disable_overlap:
other_args += ["--disable-overlap-schedule"]
model = DEFAULT_MODEL_NAME_FOR_TEST
port = random.randint(4000, 5000)
base_url = f"http://127.0.0.1:{port}"
# Create files and launch the server
stdout = open(STDOUT_FILENAME, "w")
stderr = open(STDERR_FILENAME, "w")
process = popen_launch_server(
model,
base_url,
timeout=DEFAULT_TIMEOUT_FOR_SERVER_LAUNCH,
other_args=other_args,
return_stdout_stderr=(stdout, stderr),
)
# Launch a thread to stream the output
output_lines = []
t = threading.Thread(target=read_output, args=(output_lines,))
t.start()
# Run the workload
workload_func(base_url, model)
# Clean up everything
kill_process_tree(process.pid)
stdout.close()
stderr.close()
if os.path.exists(STDOUT_FILENAME):
os.remove(STDOUT_FILENAME)
if os.path.exists(STDERR_FILENAME):
os.remove(STDERR_FILENAME)
kill_process_tree(process.pid)
t.join()
# Assert success
has_new_server = False
has_leak = False
has_abort = False
for line in output_lines:
if "Uvicorn running" in line:
has_new_server = True
if "leak" in line:
has_leak = True
if "Abort" in line:
has_abort = True
assert has_new_server
assert not has_leak
if assert_has_abort:
assert has_abort
def run_command_and_capture_output(command, env: Optional[dict] = None):
stdout = open(STDOUT_FILENAME, "w")
stderr = open(STDERR_FILENAME, "w")
process = subprocess.Popen(
command, stdout=stdout, stderr=stdout, env=env, text=True
)
# Launch a thread to stream the output
output_lines = []
t = threading.Thread(target=read_output, args=(output_lines, STDOUT_FILENAME))
t.start()
# Join the process
process.wait()
stdout.close()
stderr.close()
if os.path.exists(STDOUT_FILENAME):
os.remove(STDOUT_FILENAME)
if os.path.exists(STDERR_FILENAME):
os.remove(STDERR_FILENAME)
kill_process_tree(process.pid)
t.join()
return output_lines
def run_mmlu_test(
disable_radix_cache=False,
enable_mixed_chunk=False,
disable_overlap=False,
chunked_prefill_size=32,
):
def workload_func(base_url, model):
# Run the eval
args = SimpleNamespace(
base_url=base_url,
model=model,
eval_name="mmlu",
num_examples=128,
num_threads=128,
)
try:
metrics = run_eval(args)
assert metrics["score"] >= 0.65, f"{metrics=}"
finally:
pass
run_and_check_memory_leak(
workload_func,
disable_radix_cache,
enable_mixed_chunk,
disable_overlap,
chunked_prefill_size,
assert_has_abort=False,
)
def run_mulit_request_test(
disable_radix_cache=False,
enable_mixed_chunk=False,
enable_overlap=False,
chunked_prefill_size=32,
):
def workload_func(base_url, model):
def run_one(_):
prompt = """
System: You are a helpful assistant.
User: What is the capital of France?
Assistant: The capital of France is
"""
response = requests.post(
f"{base_url}/generate",
json={
"text": prompt,
"sampling_params": {
"temperature": 0,
"max_new_tokens": 8,
},
},
)
ret = response.json()
with ThreadPoolExecutor(2) as executor:
list(executor.map(run_one, list(range(4))))
run_and_check_memory_leak(
workload_func,
disable_radix_cache,
enable_mixed_chunk,
enable_overlap,
chunked_prefill_size,
assert_has_abort=False,
)
def write_github_step_summary(content):
if not os.environ.get("GITHUB_STEP_SUMMARY"):
logging.warning("GITHUB_STEP_SUMMARY environment variable not set")
return
with open(os.environ["GITHUB_STEP_SUMMARY"], "a") as f:
f.write(content)
def run_logprob_check(self: unittest.TestCase, arg: Tuple):
(
input_len,
output_len,
temperature,
logprob_start_len,
return_logprob,
top_logprobs_num,
) = arg
input_ids = list(range(input_len))
response = requests.post(
self.base_url + "/generate",
json={
"input_ids": input_ids,
"sampling_params": {
"temperature": temperature,
"max_new_tokens": output_len,
"ignore_eos": True,
},
"return_logprob": return_logprob,
"logprob_start_len": logprob_start_len,
"top_logprobs_num": top_logprobs_num,
},
)
response_json = response.json()
res = response_json
self.assertEqual(res["meta_info"]["prompt_tokens"], input_len)
self.assertEqual(res["meta_info"]["completion_tokens"], output_len)
# Test the number of tokens are correct
if return_logprob:
self.assertEqual(
len(res["meta_info"]["input_token_logprobs"]) + logprob_start_len,
res["meta_info"]["prompt_tokens"],
)
self.assertEqual(len(res["meta_info"]["output_token_logprobs"]), output_len)
if top_logprobs_num:
self.assertEqual(
len(res["meta_info"]["input_top_logprobs"]) + logprob_start_len,
res["meta_info"]["prompt_tokens"],
)
self.assertEqual(len(res["meta_info"]["output_top_logprobs"]), output_len)
for i in range(output_len):
self.assertEqual(
len(res["meta_info"]["output_top_logprobs"][i]),
top_logprobs_num,
)
# Test the top-1 tokens are the same as output tokens if temperature == 0
if temperature == 0:
rank = 0
while rank < len(res["meta_info"]["output_top_logprobs"][i]):
try:
self.assertListEqual(
res["meta_info"]["output_token_logprobs"][i],
res["meta_info"]["output_top_logprobs"][i][rank],
)
break
except AssertionError:
# There's a tie. Allow the second item in this case.
if (
res["meta_info"]["output_top_logprobs"][i][rank][0]
== res["meta_info"]["output_top_logprobs"][i][rank + 1][
0
]
):
rank += 1
else:
raise
def send_generate_requests(base_url: str, num_requests: int) -> List[str]:
"""Sends generate request serially and returns status codes. Max concurrency is 1."""
def generate():
prompt = """
System: You are a helpful assistant.
User: What is the capital of France?
Assistant: The capital of France is
"""
response = requests.post(
f"{base_url}/generate",
json={
"text": prompt,
"sampling_params": {
"temperature": 0,
"max_new_tokens": 50,
},
},
)
return response.status_code
return [generate() for _ in range(num_requests)]
async def send_concurrent_generate_requests(
base_url: str, num_requests: int
) -> List[str]:
"""Sends generate request concurrently and returns status codes. Max concurrency is num_requests."""
async def async_generate():
async with aiohttp.ClientSession() as session:
prompt = """
System: You are a helpful assistant.
User: What is the capital of France?
Assistant: The capital of France is
"""
async with session.post(
f"{base_url}/generate",
json={
"text": prompt,
"sampling_params": {
"temperature": 0,
"max_new_tokens": 50,
},
},
) as response:
return response.status
tasks = [asyncio.create_task(async_generate()) for _ in range(num_requests)]
return await asyncio.gather(*tasks)
async def send_concurrent_generate_requests_with_custom_params(
base_url: str,
custom_params: List[dict[str, Any]],
) -> Tuple[int, Any]:
"""Sends generate request concurrently with custom parameters and returns status code and response json tuple. Max concurrency is num_requests."""
base_payload = {
"text": """
System: You are a helpful assistant.
User: What is the capital of France?
Assistant: The capital of France is
""",
"sampling_params": {
"temperature": 0,
"max_new_tokens": 50,
},
}
async def async_generate_with_priority(req):
async with aiohttp.ClientSession() as session:
async with session.post(
f"{base_url}/generate",
json=req,
) as response:
resp_json = await response.json()
return (response.status, resp_json)
tasks = []
for c in custom_params:
req = base_payload.copy()
req.update(c)
tasks.append(asyncio.create_task(async_generate_with_priority(req)))
return await asyncio.gather(*tasks)
class CustomTestCase(unittest.TestCase):
def _callTestMethod(self, method):
max_retry = int(
os.environ.get("SGLANG_TEST_MAX_RETRY", "1" if is_in_ci() else "0")
)
retry(
lambda: super(CustomTestCase, self)._callTestMethod(method),
max_retry=max_retry,
)
def dump_bench_raw_result(
path: str,
states,
preds,
labels,
):
if not path:
return
rows = []
for i in range(len(states)):
state = states[i]
output = state["answer"]
prompt = _ensure_remove_suffix(state.text(), output)
rows.append(
dict(
prompt_id=i,
prompt=prompt,
output=output,
correct=bool(preds[i] == labels[i]),
)
)
print(f"BenchRawResultDumper save results to {path}")
Path(path).write_text("\n".join(json.dumps(row) for row in rows))
def _ensure_remove_suffix(text: str, suffix: str):
assert text.endswith(suffix)
return text.removesuffix(suffix)
class ModelLaunchSettings:
def __init__(
self,
model_path: str,
tp_size: int = 1,
extra_args: Optional[List[str]] = None,
env: Optional[dict] = None,
):
self.model_path = model_path
self.tp_size = tp_size
self.extra_args = list(extra_args) if extra_args else []
self.env = env
if self.tp_size > 1 and "--tp" not in self.extra_args:
self.extra_args.extend(["--tp", str(self.tp_size)])
fixed_args = ["--enable-multimodal", "--trust-remote-code"]
for fixed_arg in fixed_args:
if fixed_arg not in self.extra_args:
self.extra_args.append(fixed_arg)
class ModelEvalMetrics:
def __init__(self, accuracy: float, eval_time: float):
self.accuracy = accuracy
self.eval_time = eval_time
def extract_trace_link_from_bench_one_batch_server_output(output: str) -> str:
match = re.search(r"\[Profile\]\((.*?)\)", output)
if match:
trace_link = match.group(1)
return trace_link
return None
def parse_models(model_string: str):
return [model.strip() for model in model_string.split(",") if model.strip()]
def check_evaluation_test_results(
results,
test_name,
model_accuracy_thresholds,
model_latency_thresholds=None,
model_count=None,
):
"""
results: list of tuple of (model_path, accuracy, latency)
"""
failed_models = []
if model_latency_thresholds is not None:
summary = " | model | status | score | score_threshold | latency | latency_threshold | \n"
summary += "| ----- | ------ | ----- | --------------- | ------- | ----------------- | \n"
else:
summary = " | model | status | score | score_threshold | \n"
summary += "| ----- | ------ | ----- | --------------- | \n"
results_dict = {res[0]: (res[1], res[2]) for res in results}
for model, accuracy_threshold in sorted(model_accuracy_thresholds.items()):
latency_threshold = (
model_latency_thresholds.get(model)
if model_latency_thresholds is not None
else 1e9
)
if model in results_dict:
accuracy, latency = results_dict[model]
is_success = accuracy >= accuracy_threshold and latency <= latency_threshold
status_emoji = "" if is_success else ""
if not is_success:
if accuracy < accuracy_threshold:
failed_models.append(
f"\nScore Check Failed: {model}\n"
f"Model {model} score ({accuracy:.4f}) is below threshold ({accuracy_threshold:.4f})"
)
if latency > latency_threshold:
failed_models.append(
f"\nLatency Check Failed: {model}\n"
f"Model {model} latency ({latency:.4f}) is above threshold ({latency_threshold:.4f})"
)
if model_latency_thresholds is not None:
line = f"| {model} | {status_emoji} | {accuracy} | {accuracy_threshold} | {latency} | {latency_threshold}\n"
else:
line = (
f"| {model} | {status_emoji} | {accuracy} | {accuracy_threshold}\n"
)
else:
status_emoji = ""
failed_models.append(f"Model failed to launch or be evaluated: {model}")
if model_latency_thresholds is not None:
line = f"| {model} | {status_emoji} | N/A | {accuracy_threshold} | N/A | {latency_threshold}\n"
else:
line = f"| {model} | {status_emoji} | N/A | {accuracy_threshold}\n"
summary += line
print(summary)
if is_in_ci():
write_github_step_summary(f"## {test_name}\n{summary}")
if failed_models:
print("Some models failed the evaluation.")
raise AssertionError("\n".join(failed_models))
# Bench knobs for bench_one_batch_server (override by env)
def _parse_int_list_env(name: str, default_val: str):
val = os.environ.get(name, default_val)
return [int(x) for x in val.split(",") if x]
# Return filenames
def find_traces_under_path(path: str) -> List[str]:
results = []
for _, dirs, files in os.walk(path):
for file in files:
if file.endswith(".trace.json.gz"):
results.append(f"{file}")
return results
def write_results_to_json(model, metrics, mode="a"):
result = {
"timestamp": datetime.now().isoformat(),
"model": model,
"metrics": metrics,
"score": metrics["score"],
}
if "latency" in metrics:
result["latency"] = (metrics.get("latency"),)
existing_results = []
if mode == "a" and os.path.exists("results.json"):
try:
with open("results.json", "r") as f:
existing_results = json.load(f)
except json.JSONDecodeError:
existing_results = []
if isinstance(existing_results, list):
existing_results.append(result)
else:
existing_results = [result]
with open("results.json", "w") as f:
json.dump(existing_results, f, indent=2)