bench_serving support PD Disaggregation (#11542)

This commit is contained in:
Xiaoyu Zhang
2025-10-14 10:43:26 +08:00
committed by GitHub
parent cb8ed2c09a
commit 88a6f9dab5
3 changed files with 146 additions and 10 deletions

View File

@@ -622,6 +622,48 @@ async def async_request_profile(api_url: str) -> RequestFuncOutput:
return output
def _build_profile_urls(
profile_prefill_url: Optional[List[str]],
profile_decode_url: Optional[List[str]],
) -> List[Tuple[str, str]]:
"""Build profile URLs list from prefill/decode URL arguments.
Returns:
List of (worker_type, url) tuples. e.g., [("Prefill-0", "http://..."), ("Decode-0", "http://...")]
"""
profile_urls = []
if profile_prefill_url:
for idx, url in enumerate(profile_prefill_url):
profile_urls.append((f"Prefill-{idx}", url))
if profile_decode_url:
for idx, url in enumerate(profile_decode_url):
profile_urls.append((f"Decode-{idx}", url))
return profile_urls
async def _call_profile_pd(profile_urls: List[Tuple[str, str]], mode: str) -> None:
"""Call profile endpoint (start/stop) on PD separated workers.
Args:
profile_urls: List of (worker_type, url) tuples
mode: "start" or "stop"
"""
endpoint = "/start_profile" if mode == "start" else "/stop_profile"
action = "Starting" if mode == "start" else "Stopping"
action_past = "started" if mode == "start" else "stopped"
print(f"{action} profiler...")
for worker_type, url in profile_urls:
profile_output = await async_request_profile(api_url=url + endpoint)
if profile_output.success:
print(f"Profiler {action_past} for {worker_type} worker at {url}")
else:
print(
f"Failed to {mode} profiler for {worker_type} worker at {url}: {profile_output.error}"
)
def get_model(pretrained_model_name_or_path: str) -> str:
if os.getenv("SGLANG_USE_MODELSCOPE", "false").lower() == "true":
import huggingface_hub.constants
@@ -1675,6 +1717,8 @@ async def benchmark(
use_trace_timestamps: bool = False,
mooncake_slowdown_factor=1.0,
mooncake_num_rounds=1,
profile_prefill_url: Optional[List[str]] = None,
profile_decode_url: Optional[List[str]] = None,
):
if backend in ASYNC_REQUEST_FUNCS:
request_func = ASYNC_REQUEST_FUNCS[backend]
@@ -1764,14 +1808,28 @@ async def benchmark(
time.sleep(1.0)
# Build profile URLs for PD separated mode (do this once at the beginning)
pd_profile_urls = []
if profile and pd_separated:
pd_profile_urls = _build_profile_urls(profile_prefill_url, profile_decode_url)
if not pd_profile_urls:
print(
"Warning: PD separated mode requires --profile-prefill-url or --profile-decode-url"
)
print("Skipping profiler start. Please specify worker URLs for profiling.")
# Start profiler
if profile:
print("Starting profiler...")
profile_output = await async_request_profile(
api_url=base_url + "/start_profile"
)
if profile_output.success:
print("Profiler started")
if pd_separated:
if pd_profile_urls:
await _call_profile_pd(pd_profile_urls, "start")
else:
print("Starting profiler...")
profile_output = await async_request_profile(
api_url=base_url + "/start_profile"
)
if profile_output.success:
print("Profiler started")
# Run all requests
benchmark_start_time = time.perf_counter()
@@ -1820,10 +1878,16 @@ async def benchmark(
# Stop profiler
if profile:
print("Stopping profiler...")
profile_output = await async_request_profile(api_url=base_url + "/stop_profile")
if profile_output.success:
print("Profiler stopped")
if pd_separated:
if pd_profile_urls:
await _call_profile_pd(pd_profile_urls, "stop")
else:
print("Stopping profiler...")
profile_output = await async_request_profile(
api_url=base_url + "/stop_profile"
)
if profile_output.success:
print("Profiler stopped")
if pbar is not None:
pbar.close()
@@ -2204,6 +2268,8 @@ def run_benchmark(args_: argparse.Namespace):
use_trace_timestamps=args.use_trace_timestamps,
mooncake_slowdown_factor=args.mooncake_slowdown_factor,
mooncake_num_rounds=args.mooncake_num_rounds,
profile_prefill_url=getattr(args, "profile_prefill_url", None),
profile_decode_url=getattr(args, "profile_decode_url", None),
)
)
@@ -2429,6 +2495,30 @@ if __name__ == "__main__":
action="store_true",
help="Benchmark PD disaggregation server",
)
# Create a mutually exclusive group for profiling URLs
# In PD separated mode, prefill and decode workers must be profiled separately
profile_url_group = parser.add_mutually_exclusive_group()
profile_url_group.add_argument(
"--profile-prefill-url",
type=str,
nargs="*",
default=None,
help="URL(s) of the prefill worker(s) for profiling in PD separated mode. "
"Can specify multiple URLs: --profile-prefill-url http://localhost:30000 http://localhost:30001. "
"NOTE: Cannot be used together with --profile-decode-url. "
"In PD separated mode, prefill and decode workers must be profiled separately.",
)
profile_url_group.add_argument(
"--profile-decode-url",
type=str,
nargs="*",
default=None,
help="URL(s) of the decode worker(s) for profiling in PD separated mode. "
"Can specify multiple URLs: --profile-decode-url http://localhost:30010 http://localhost:30011. "
"NOTE: Cannot be used together with --profile-prefill-url. "
"In PD separated mode, prefill and decode workers must be profiled separately.",
)
parser.add_argument(
"--flush-cache",
action="store_true",