Files
sglang/benchmark/score/bench_score.py
2025-08-14 05:12:24 +08:00

604 lines
22 KiB
Python

"""
SGLang Scoring Benchmark Script
This script benchmarks SGLang's scoring API performance using HTTP requests.
Current Features:
- HTTP-only implementation (open source compatible)
- Uses /v1/score API endpoint directly
- Single item scoring with batching support
- Configurable RPS, duration, and batch sizes
- Progress tracking and detailed metrics
- Poisson and constant request distributions
Usage:
- Update configuration variables at the top of the file
- Ensure SGLang server is running on the configured HTTP_URL
- Run: python bench_score.py
- Each request will contain ITEM_COUNT_VALUES items for batch scoring
"""
import asyncio
import concurrent.futures # For parallel prompt generation
import json
import os
import random
from statistics import mean
import aiohttp
import numpy as np
from tqdm import tqdm
from transformers import AutoTokenizer
###############################################################################
# CONFIG
###############################################################################
# Server Configuration
SERVER_TYPE = "HTTP" # Fixed to HTTP for open source
# HTTP Configuration
HTTP_URL = "http://localhost:30000/v1/score" # Use score API directly
# Score API Config
# ITEM_COUNT_VALUES determines number of items per score request (batch size)
SCORE_QUERY_TOKENS = 120
SCORE_ITEM_TOKENS = 180
SCORE_MODEL_PATH = "Qwen/Qwen3-0.6B"
SCORE_LABEL_TOKEN_IDS = [9454, 2753] # Yes/No token IDs
# Array of RPS values to test
RPS_VALUES = [70]
# Array of duration values to test
DURATION_SECS_VALUES = [60] # Duration values in seconds
# Array of item count values to test
ITEM_COUNT_VALUES = [10] # Number of items per request
# Number of unique requests to generate (will be reused)
NUM_UNIQUE_REQUESTS = 100
DISTRIBUTION = "POISSON" # Options: "CONSTANT", "POISSON"
# Profiling Configuration
PROFILE = False # Enable profiling with START_PROFILE/STOP_PROFILE prompts
# Directory for profiler output
SGLANG_TORCH_PROFILER_DIR = "/shared/user/sglang-oss-trace/remove-decode"
if PROFILE:
os.environ["SGLANG_TORCH_PROFILER_DIR"] = SGLANG_TORCH_PROFILER_DIR
# Special token to replicate for precise token counting
SPECIAL_REPLICATED_TOKEN = "<|im_start|>"
###############################################################################
# REQUEST GENERATION (in parallel)
###############################################################################
def prepare_all_requests_parallel(num_requests, item_count):
"""
Generates unique requests in parallel, then reuses them to create the
full request list. Returns a list of str prompts for HTTP.
"""
# Load tokenizer once here to verify special token and get precise counts
print("Loading tokenizer...")
tokenizer = AutoTokenizer.from_pretrained(SCORE_MODEL_PATH)
# Verify that our special token produces exactly 1 token
special_token_count = len(
tokenizer.encode(SPECIAL_REPLICATED_TOKEN, add_special_tokens=False)
)
print(
f"Special token '{SPECIAL_REPLICATED_TOKEN}' produces "
f"{special_token_count} token(s)"
)
def generate_text_with_token_count(num_toks):
"""Generate text with precise token count using replicated token."""
if special_token_count == 1:
# Simple case: token maps to exactly 1 token
return SPECIAL_REPLICATED_TOKEN * num_toks
else:
print(
f"Special token '{SPECIAL_REPLICATED_TOKEN}' produces more than 1 token!!!"
)
# Handle case where special token produces multiple tokens
# Repeat the token enough times to get at least num_toks tokens
repetitions = (num_toks + special_token_count - 1) // special_token_count
text = SPECIAL_REPLICATED_TOKEN * repetitions
# Verify we got the expected token count (approximately)
actual_tokens = len(tokenizer.encode(text, add_special_tokens=False))
if actual_tokens < num_toks:
print(
f"Warning: Generated {actual_tokens} tokens, "
f"expected {num_toks}"
)
return text
def build_request(index):
"""Build a single request using the shared tokenizer."""
try:
# Generate query and items for score API
query = generate_text_with_token_count(SCORE_QUERY_TOKENS)
items = [
generate_text_with_token_count(SCORE_ITEM_TOKENS)
for _ in range(item_count)
]
# Return as dict for score API format
score_data = {
"query": query,
"items": items,
"label_token_ids": SCORE_LABEL_TOKEN_IDS,
"model": SCORE_MODEL_PATH,
}
return (index, score_data)
except Exception as e:
print(f"Error building request {index}: {e}")
return (index, None)
# Generate only the unique requests
unique_requests = [None] * NUM_UNIQUE_REQUESTS
# Use ThreadPoolExecutor instead of ProcessPoolExecutor to avoid
# tokenizer loading issues across processes
max_workers = min(8, os.cpu_count() or 1) # Limit to 8 threads max
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = []
for i in tqdm(
range(NUM_UNIQUE_REQUESTS), desc="Submitting prompt generation tasks"
):
future = executor.submit(build_request, i)
futures.append(future)
# Collect results as they complete
for f in tqdm(
concurrent.futures.as_completed(futures),
desc="Building unique requests",
total=NUM_UNIQUE_REQUESTS,
):
try:
index, req_data = f.result()
if req_data is not None:
unique_requests[index] = req_data
else:
print(f"Failed to build request {index}")
except Exception as e:
print(f"Error processing request result: {e}")
# Check if we have any valid requests
valid_requests = [req for req in unique_requests if req is not None]
if not valid_requests:
raise RuntimeError("Failed to generate any valid requests")
print(
f"Successfully generated {len(valid_requests)} out of "
f"{NUM_UNIQUE_REQUESTS} unique requests"
)
# Create the full request list by cycling through unique requests
print(
f"Reusing {len(valid_requests)} unique requests to create "
f"{num_requests} total requests..."
)
all_requests = []
for i in tqdm(range(num_requests), desc="Reusing requests"):
unique_index = i % len(valid_requests)
all_requests.append(valid_requests[unique_index])
print("All prompts/requests prepared.\n")
return all_requests
###############################################################################
# PROFILING HELPERS
###############################################################################
async def send_profile_request(profile_text, item_count, session=None):
"""Send a profile request and wait for completion."""
try:
if session:
print(f"Sending {profile_text} request via HTTP...")
# Determine the correct endpoint
base_url = HTTP_URL.rsplit("/", 2)[0] # Remove /v1/score
if profile_text == "START_PROFILE":
endpoint_url = f"{base_url}/start_profile"
elif profile_text == "STOP_PROFILE":
endpoint_url = f"{base_url}/stop_profile"
else:
print(f"Unknown profile request: {profile_text}")
return
headers = {"Content-Type": "application/json"}
async with session.post(endpoint_url, headers=headers) as resp:
resp_text = await resp.text()
if resp.status == 200:
print(f"{profile_text} request completed")
else:
print(
f"{profile_text} request failed with status "
f"{resp.status}: {resp_text}"
)
else:
print(f"Cannot send {profile_text} request - missing session")
except Exception as e:
print(f"Error sending {profile_text} request: {e}")
###############################################################################
# HTTP CALLS
###############################################################################
def build_http_request_json(score_data):
"""Build HTTP request JSON for /v1/score endpoint.
Score API format:
{
"query": "Generated query text with SCORE_QUERY_TOKENS tokens",
"items": ["item1", "item2", ...], # Items to score with SCORE_ITEM_TOKENS each
"label_token_ids": [token_id1, token_id2], # Target token IDs
"model": "/path/to/model"
}
Args:
score_data: A dict containing query, items, label_token_ids, and model
"""
# score_data is already in the correct format from build_request
return json.dumps(score_data)
async def make_http_call(session, score_data, request_id, results_queue):
"""HTTP call to /v1/score endpoint."""
try:
start_time = asyncio.get_event_loop().time()
request_json = build_http_request_json(score_data)
headers = {"Content-Type": "application/json"}
async with session.post(HTTP_URL, data=request_json, headers=headers) as resp:
resp_text = await resp.text()
if resp.status != 200:
print(
f"[HTTP] Request {request_id} failed with status "
f"{resp.status}: {resp_text}"
)
completion_time = asyncio.get_event_loop().time()
await results_queue.put((request_id, 0, False, completion_time))
return
# Parse score API response
try:
response_data = json.loads(resp_text)
# Score API returns scores for each item
# For now, just verify we got a valid response
if "scores" in response_data or "logprobs" in response_data:
success = True
else:
print(
f"[HTTP] Request {request_id} missing expected fields in response"
)
success = False
except json.JSONDecodeError:
print(f"[HTTP] Request {request_id} failed to parse JSON response")
success = False
completion_time = asyncio.get_event_loop().time()
elapsed_time = (completion_time - start_time) * 1000
await results_queue.put((request_id, elapsed_time, success, completion_time))
except Exception as e:
print(f"[HTTP] Error for request {request_id}: {e}")
completion_time = asyncio.get_event_loop().time()
await results_queue.put((request_id, 0, False, completion_time))
###############################################################################
# RESULTS
###############################################################################
async def process_results(
results_queue,
num_requests,
send_duration,
total_duration,
rps,
duration_secs,
item_count,
test_start_time,
):
"""Processes results and groups them by minute intervals.
Returns a list of dictionaries, one for each minute."""
all_results = []
# Collect all results
for _ in range(num_requests):
result = await results_queue.get()
request_id, elapsed_time, success, completion_time = result
all_results.append(
{
"request_id": request_id,
"elapsed_time": elapsed_time,
"success": success,
"completion_time": completion_time,
}
)
# Group results by minute intervals
minute_results = []
num_minutes = int(duration_secs // 60) + (1 if duration_secs % 60 > 0 else 0)
for minute in range(num_minutes):
minute_start = test_start_time + (minute * 60)
minute_end = test_start_time + ((minute + 1) * 60)
# Filter results that completed in this minute
minute_data = [
r for r in all_results if minute_start <= r["completion_time"] < minute_end
]
response_times = [r["elapsed_time"] for r in minute_data if r["success"]]
successful_requests = len([r for r in minute_data if r["success"]])
failed_requests = len([r for r in minute_data if not r["success"]])
avg_response_time = mean(response_times) if response_times else 0
# Calculate percentiles using numpy
if response_times:
p50 = np.percentile(response_times, 50)
p90 = np.percentile(response_times, 90)
p99 = np.percentile(response_times, 99)
else:
p50 = p90 = p99 = 0
minute_result = {
"test_duration_secs": duration_secs,
"minute_interval": minute + 1,
"target_rps": rps,
"item_count": item_count,
"server_type": SERVER_TYPE,
"distribution": DISTRIBUTION,
"unique_requests": NUM_UNIQUE_REQUESTS,
"total_requests": len(minute_data),
"successful_requests": successful_requests,
"failed_requests": failed_requests,
"send_duration_secs": send_duration,
"total_duration_secs": total_duration,
"avg_response_time_ms": avg_response_time,
"p50_response_time_ms": p50,
"p90_response_time_ms": p90,
"p99_response_time_ms": p99,
}
minute_results.append(minute_result)
print(
f"\nMinute {minute + 1} Summary for RPS {rps}, "
f"Duration {duration_secs}s, Item Count {item_count}:"
)
print(f" Requests completed in minute: {len(minute_data)}")
print(f" Successful requests: {successful_requests}")
print(f" Failed requests: {failed_requests}")
print(f" Average response time: {avg_response_time:.2f} ms")
print(f" P50 response time: {p50:.2f} ms")
print(f" P90 response time: {p90:.2f} ms")
print(f" P99 response time: {p99:.2f} ms")
# Also print overall summary
all_response_times = [r["elapsed_time"] for r in all_results if r["success"]]
total_successful = len([r for r in all_results if r["success"]])
total_failed = len([r for r in all_results if not r["success"]])
overall_avg = mean(all_response_times) if all_response_times else 0
if all_response_times:
overall_p50 = np.percentile(all_response_times, 50)
overall_p90 = np.percentile(all_response_times, 90)
overall_p99 = np.percentile(all_response_times, 99)
else:
overall_p50 = overall_p90 = overall_p99 = 0
print(
f"\nOverall Summary for RPS {rps}, Duration {duration_secs}s, "
f"Item Count {item_count}:"
)
print(f" Test duration: {duration_secs} seconds")
print(f" Server type: {SERVER_TYPE}")
print(f" HTTP mode: SINGLE_ITEM_SCORING")
print(f" Target RPS: {rps}")
print(f" Item count: {item_count}")
print(f" Distribution: {DISTRIBUTION}")
print(f" Unique requests generated: {NUM_UNIQUE_REQUESTS}")
print(f" Total requests sent: {num_requests}")
print(f" Successful requests: {total_successful}")
print(f" Failed requests: {total_failed}")
print(f" Time to send all requests: {send_duration:.2f} seconds")
print(f" Time for all requests to complete: {total_duration:.2f} seconds")
print(f" Average response time: {overall_avg:.2f} ms")
print(f" P50 response time: {overall_p50:.2f} ms")
print(f" P90 response time: {overall_p90:.2f} ms")
print(f" P99 response time: {overall_p99:.2f} ms\n")
return minute_results
###############################################################################
# MAIN
###############################################################################
async def run_benchmark(rps, duration_secs, item_count):
"""Run a single benchmark with the given RPS value."""
num_requests = int(rps * duration_secs)
print(
f"Starting benchmark with RPS={rps}, Duration={duration_secs}s, "
f"Item Count={item_count}, num_requests={num_requests}"
)
print(f"Server Type: {SERVER_TYPE}")
print(f"HTTP Mode: SINGLE_ITEM_SCORING")
print(f"Profiling Enabled: {PROFILE}")
# Build requests in parallel (unmeasured)
all_requests = prepare_all_requests_parallel(num_requests, item_count)
results_queue = asyncio.Queue()
tasks = []
# Track timing for sending requests
send_start_time = asyncio.get_event_loop().time()
# HTTP implementation (open source only supports HTTP with /v1/score API)
async with aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=300)
) as session:
# Send START_PROFILE if profiling is enabled
if PROFILE:
await send_profile_request("START_PROFILE", item_count, session=session)
# Add progress bar for sending requests
with tqdm(
total=len(all_requests),
desc=f"Sending HTTP score requests at {rps} RPS",
unit="req",
) as pbar:
for i, score_data in enumerate(all_requests):
request_id = i + 1
tasks.append(
asyncio.create_task(
make_http_call(session, score_data, request_id, results_queue)
)
)
# Update progress bar
pbar.update(1)
# Throttle based on distribution
if i < len(all_requests) - 1:
if DISTRIBUTION == "CONSTANT":
interval = 1 / rps
await asyncio.sleep(interval)
elif DISTRIBUTION == "POISSON":
# For Poisson process, inter-arrival times follow
# exponential distribution
interval = random.expovariate(rps)
await asyncio.sleep(interval)
else:
raise ValueError(
f"Unknown distribution: {DISTRIBUTION}. "
f"Use 'CONSTANT' or 'POISSON'."
)
send_end_time = asyncio.get_event_loop().time()
send_duration = send_end_time - send_start_time
# Wait for all requests to complete with progress tracking
print(f"Waiting for {len(tasks)} HTTP score requests to complete...")
with tqdm(
total=len(tasks), desc="Completing HTTP score requests", unit="req"
) as completion_pbar:
completed_tasks = []
for task in asyncio.as_completed(tasks):
await task
completed_tasks.append(task)
completion_pbar.update(1)
# Send STOP_PROFILE if profiling is enabled
if PROFILE:
await send_profile_request("STOP_PROFILE", item_count, session=session)
completion_end_time = asyncio.get_event_loop().time()
total_duration = completion_end_time - send_start_time
return await process_results(
results_queue,
num_requests,
send_duration,
total_duration,
rps,
duration_secs,
item_count,
send_start_time,
)
async def main():
"""Main function that runs benchmarks for all RPS values."""
total_combinations = (
len(DURATION_SECS_VALUES) * len(RPS_VALUES) * len(ITEM_COUNT_VALUES)
)
print(
f"Running benchmarks for {len(DURATION_SECS_VALUES)} duration "
f"values, {len(RPS_VALUES)} RPS values, and "
f"{len(ITEM_COUNT_VALUES)} item count values = "
f"{total_combinations} total combinations"
)
print(f"Server Type: {SERVER_TYPE}")
print(f"HTTP Mode: SINGLE_ITEM_SCORING")
print(f"Score API URL: {HTTP_URL}")
print(f"Query tokens per request: {SCORE_QUERY_TOKENS}")
print(f"Item tokens per item: {SCORE_ITEM_TOKENS}")
print(f"Items per request (batch size): {ITEM_COUNT_VALUES}")
print(f"Profiling Enabled: {PROFILE}")
print(f"Duration values: {DURATION_SECS_VALUES}")
print(f"RPS values: {RPS_VALUES}")
print(f"Item count values: {ITEM_COUNT_VALUES}")
print("=" * 80)
all_results = []
for duration_secs in DURATION_SECS_VALUES:
for rps in RPS_VALUES:
for item_count in ITEM_COUNT_VALUES:
result = await run_benchmark(rps, duration_secs, item_count)
all_results.extend(result) # Extend with minute results
# Print CSV header and results
print("\n" + "=" * 80)
print("FINAL CSV RESULTS:")
print("=" * 80)
# CSV Header
headers = [
"test_duration_secs",
"minute_interval",
"target_rps",
"item_count",
"server_type",
"distribution",
"unique_requests",
"total_requests",
"successful_requests",
"failed_requests",
"send_duration_secs",
"total_duration_secs",
"avg_response_time_ms",
"p50_response_time_ms",
"p90_response_time_ms",
"p99_response_time_ms",
]
print(",".join(headers))
# CSV Data
for result in all_results:
row = [
result["test_duration_secs"],
result["minute_interval"],
result["target_rps"],
result["item_count"],
result["server_type"],
result["distribution"],
result["unique_requests"],
result["total_requests"],
result["successful_requests"],
result["failed_requests"],
f"{result['send_duration_secs']:.2f}",
f"{result['total_duration_secs']:.2f}",
f"{result['avg_response_time_ms']:.2f}",
f"{result['p50_response_time_ms']:.2f}",
f"{result['p90_response_time_ms']:.2f}",
f"{result['p99_response_time_ms']:.2f}",
]
print(",".join(map(str, row)))
if __name__ == "__main__":
asyncio.run(main())