[CI] Refactor PD disaggregation test suite (#11363)
Signed-off-by: Shangming Cai <csmthu@gmail.com>
This commit is contained in:
389
test/srt/test_disaggregation_basic.py
Normal file
389
test/srt/test_disaggregation_basic.py
Normal file
@@ -0,0 +1,389 @@
|
||||
import json
|
||||
import os
|
||||
import time
|
||||
import unittest
|
||||
from types import SimpleNamespace
|
||||
|
||||
import requests
|
||||
|
||||
from sglang.test.few_shot_gsm8k import run_eval as run_eval_few_shot_gsm8k
|
||||
from sglang.test.test_disaggregation_utils import TestDisaggregationBase
|
||||
from sglang.test.test_utils import (
|
||||
DEFAULT_EAGLE_DRAFT_MODEL_FOR_TEST,
|
||||
DEFAULT_EAGLE_TARGET_MODEL_FOR_TEST,
|
||||
DEFAULT_MODEL_NAME_FOR_TEST,
|
||||
DEFAULT_TIMEOUT_FOR_SERVER_LAUNCH,
|
||||
popen_launch_pd_server,
|
||||
)
|
||||
|
||||
|
||||
class TestDisaggregationAccuracy(TestDisaggregationBase):
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
super().setUpClass()
|
||||
cls.model = DEFAULT_MODEL_NAME_FOR_TEST
|
||||
|
||||
# Non blocking start servers
|
||||
cls.start_prefill()
|
||||
cls.start_decode()
|
||||
|
||||
# Block until both
|
||||
cls.wait_server_ready(cls.prefill_url + "/health")
|
||||
cls.wait_server_ready(cls.decode_url + "/health")
|
||||
|
||||
cls.launch_lb()
|
||||
|
||||
@classmethod
|
||||
def start_prefill(cls):
|
||||
prefill_args = [
|
||||
"--trust-remote-code",
|
||||
"--disaggregation-mode",
|
||||
"prefill",
|
||||
"--tp",
|
||||
"1",
|
||||
]
|
||||
prefill_args += cls.transfer_backend + cls.rdma_devices
|
||||
cls.process_prefill = popen_launch_pd_server(
|
||||
cls.model,
|
||||
cls.prefill_url,
|
||||
timeout=DEFAULT_TIMEOUT_FOR_SERVER_LAUNCH,
|
||||
other_args=prefill_args,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def start_decode(cls):
|
||||
decode_args = [
|
||||
"--trust-remote-code",
|
||||
"--disaggregation-mode",
|
||||
"decode",
|
||||
"--tp",
|
||||
"1",
|
||||
"--base-gpu-id",
|
||||
"1",
|
||||
]
|
||||
decode_args += cls.transfer_backend + cls.rdma_devices
|
||||
cls.process_decode = popen_launch_pd_server(
|
||||
cls.model,
|
||||
cls.decode_url,
|
||||
timeout=DEFAULT_TIMEOUT_FOR_SERVER_LAUNCH,
|
||||
other_args=decode_args,
|
||||
)
|
||||
|
||||
def test_gsm8k(self):
|
||||
args = SimpleNamespace(
|
||||
num_shots=5,
|
||||
data_path=None,
|
||||
num_questions=200,
|
||||
max_new_tokens=512,
|
||||
parallel=128,
|
||||
host=f"http://{self.base_host}",
|
||||
port=int(self.lb_port),
|
||||
)
|
||||
metrics = run_eval_few_shot_gsm8k(args)
|
||||
print(f"Evaluation metrics: {metrics}")
|
||||
|
||||
self.assertGreater(metrics["accuracy"], 0.62)
|
||||
|
||||
def test_logprob(self):
|
||||
prompt = "The capital of france is "
|
||||
response = requests.post(
|
||||
self.lb_url + "/generate",
|
||||
json={
|
||||
"text": prompt,
|
||||
"sampling_params": {"temperature": 0},
|
||||
"return_logprob": True,
|
||||
"return_input_logprob": True,
|
||||
"logprob_start_len": 0,
|
||||
},
|
||||
)
|
||||
|
||||
j = response.json()
|
||||
completion_tokens = j["meta_info"]["completion_tokens"]
|
||||
input_logprobs = j["meta_info"]["input_token_logprobs"]
|
||||
output_logprobs = j["meta_info"]["output_token_logprobs"]
|
||||
|
||||
assert (
|
||||
len(output_logprobs) == completion_tokens
|
||||
), f"output_logprobs and completion_tokens should have the same length, but got {len(output_logprobs)} and {completion_tokens}"
|
||||
assert (
|
||||
len(input_logprobs) > 0
|
||||
), f"input_logprobs should have at least one token, but got {len(input_logprobs)}"
|
||||
|
||||
def test_structured_output(self):
|
||||
json_schema = json.dumps(
|
||||
{
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"name": {"type": "string", "pattern": "^[\\w]+$"},
|
||||
"population": {"type": "integer"},
|
||||
},
|
||||
"required": ["name", "population"],
|
||||
}
|
||||
)
|
||||
|
||||
# JSON
|
||||
response = requests.post(
|
||||
f"{self.lb_url}/generate",
|
||||
json={
|
||||
"text": "Here is the information of the capital of France in the JSON format.\n",
|
||||
"sampling_params": {
|
||||
"temperature": 0,
|
||||
"max_new_tokens": 64,
|
||||
"json_schema": json_schema,
|
||||
},
|
||||
},
|
||||
)
|
||||
output = response.json()["text"]
|
||||
# ensure the output is a valid JSON
|
||||
json.loads(output)
|
||||
|
||||
|
||||
class TestDisaggregationMooncakeFailure(TestDisaggregationBase):
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
super().setUpClass()
|
||||
# set DISAGGREGATION_TEST_FAILURE_PROB to simulate failure
|
||||
os.environ["DISAGGREGATION_TEST_FAILURE_PROB"] = "0.05"
|
||||
|
||||
cls.model = DEFAULT_MODEL_NAME_FOR_TEST
|
||||
|
||||
# Non blocking start servers
|
||||
cls.start_prefill()
|
||||
cls.start_decode()
|
||||
|
||||
# Block until both
|
||||
cls.wait_server_ready(cls.prefill_url + "/health")
|
||||
cls.wait_server_ready(cls.decode_url + "/health")
|
||||
|
||||
cls.launch_lb()
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls):
|
||||
os.environ.pop("DISAGGREGATION_TEST_FAILURE_PROB")
|
||||
super().tearDownClass()
|
||||
|
||||
@classmethod
|
||||
def start_prefill(cls):
|
||||
prefill_args = [
|
||||
"--trust-remote-code",
|
||||
"--disaggregation-mode",
|
||||
"prefill",
|
||||
"--tp",
|
||||
"1",
|
||||
]
|
||||
prefill_args += cls.transfer_backend + cls.rdma_devices
|
||||
cls.process_prefill = popen_launch_pd_server(
|
||||
cls.model,
|
||||
cls.prefill_url,
|
||||
timeout=DEFAULT_TIMEOUT_FOR_SERVER_LAUNCH,
|
||||
other_args=prefill_args,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def start_decode(cls):
|
||||
decode_args = [
|
||||
"--trust-remote-code",
|
||||
"--disaggregation-mode",
|
||||
"decode",
|
||||
"--tp",
|
||||
"1",
|
||||
"--base-gpu-id",
|
||||
"1",
|
||||
]
|
||||
decode_args += cls.transfer_backend + cls.rdma_devices
|
||||
cls.process_decode = popen_launch_pd_server(
|
||||
cls.model,
|
||||
cls.decode_url,
|
||||
timeout=DEFAULT_TIMEOUT_FOR_SERVER_LAUNCH,
|
||||
other_args=decode_args,
|
||||
)
|
||||
|
||||
def test_gsm8k(self):
|
||||
args = SimpleNamespace(
|
||||
num_shots=5,
|
||||
data_path=None,
|
||||
num_questions=200,
|
||||
max_new_tokens=512,
|
||||
parallel=128,
|
||||
host=f"http://{self.base_host}",
|
||||
port=int(self.lb_port),
|
||||
)
|
||||
|
||||
# Expect lots of failure but the server cannot crash
|
||||
try:
|
||||
metrics = run_eval_few_shot_gsm8k(args)
|
||||
print(f"Evaluation metrics: {metrics}")
|
||||
except Exception as e:
|
||||
print(f"Test encountered expected errors: {e}")
|
||||
# Check if servers are still healthy
|
||||
try:
|
||||
response = requests.get(self.prefill_url + "/health_generate")
|
||||
assert response.status_code == 200
|
||||
response = requests.get(self.decode_url + "/health_generate")
|
||||
assert response.status_code == 200
|
||||
except Exception as health_check_error:
|
||||
# If health check fails, re-raise the original exception
|
||||
raise e from health_check_error
|
||||
|
||||
|
||||
class TestDisaggregationMooncakeSpec(TestDisaggregationBase):
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
super().setUpClass()
|
||||
cls.model = DEFAULT_EAGLE_TARGET_MODEL_FOR_TEST
|
||||
cls.draft_model = DEFAULT_EAGLE_DRAFT_MODEL_FOR_TEST
|
||||
cls.spec_args = [
|
||||
"--speculative-algorithm",
|
||||
"EAGLE",
|
||||
"--speculative-draft-model-path",
|
||||
cls.draft_model,
|
||||
"--speculative-num-steps",
|
||||
"3",
|
||||
"--speculative-eagle-topk",
|
||||
"4",
|
||||
"--speculative-num-draft-tokens",
|
||||
"16",
|
||||
"--cuda-graph-max-bs",
|
||||
"8",
|
||||
]
|
||||
print(f"{cls.base_host=} {cls.lb_port=} {cls.prefill_port=} {cls.decode_port=}")
|
||||
|
||||
# Non blocking start servers
|
||||
cls.start_prefill()
|
||||
cls.start_decode()
|
||||
|
||||
# Block until both
|
||||
cls.wait_server_ready(cls.prefill_url + "/health")
|
||||
cls.wait_server_ready(cls.decode_url + "/health")
|
||||
|
||||
cls.launch_lb()
|
||||
|
||||
@classmethod
|
||||
def start_prefill(cls):
|
||||
prefill_args = [
|
||||
"--trust-remote-code",
|
||||
"--disaggregation-mode",
|
||||
"prefill",
|
||||
"--tp",
|
||||
"1",
|
||||
] + cls.spec_args
|
||||
prefill_args += cls.transfer_backend + cls.rdma_devices
|
||||
cls.process_prefill = popen_launch_pd_server(
|
||||
cls.model,
|
||||
cls.prefill_url,
|
||||
timeout=DEFAULT_TIMEOUT_FOR_SERVER_LAUNCH,
|
||||
other_args=prefill_args,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def start_decode(cls):
|
||||
decode_args = [
|
||||
"--trust-remote-code",
|
||||
"--disaggregation-mode",
|
||||
"decode",
|
||||
"--tp",
|
||||
"1",
|
||||
"--base-gpu-id",
|
||||
"1",
|
||||
] + cls.spec_args
|
||||
decode_args += cls.transfer_backend + cls.rdma_devices
|
||||
cls.process_decode = popen_launch_pd_server(
|
||||
cls.model,
|
||||
cls.decode_url,
|
||||
timeout=DEFAULT_TIMEOUT_FOR_SERVER_LAUNCH,
|
||||
other_args=decode_args,
|
||||
)
|
||||
|
||||
def test_gsm8k(self):
|
||||
args = SimpleNamespace(
|
||||
num_shots=5,
|
||||
data_path=None,
|
||||
num_questions=200,
|
||||
max_new_tokens=512,
|
||||
parallel=2,
|
||||
host=f"http://{self.base_host}",
|
||||
port=int(self.lb_port),
|
||||
)
|
||||
metrics = run_eval_few_shot_gsm8k(args)
|
||||
print(f"Evaluation metrics: {metrics}")
|
||||
|
||||
self.assertGreater(metrics["accuracy"], 0.20)
|
||||
|
||||
|
||||
class TestDisaggregationSimulatedRetract(TestDisaggregationBase):
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
super().setUpClass()
|
||||
os.environ["SGLANG_TEST_RETRACT"] = "true"
|
||||
cls.model = DEFAULT_MODEL_NAME_FOR_TEST
|
||||
|
||||
# Non blocking start servers
|
||||
cls.start_prefill()
|
||||
cls.start_decode()
|
||||
|
||||
# Block until both
|
||||
cls.wait_server_ready(cls.prefill_url + "/health")
|
||||
cls.wait_server_ready(cls.decode_url + "/health")
|
||||
|
||||
cls.launch_lb()
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls):
|
||||
os.environ.pop("SGLANG_TEST_RETRACT")
|
||||
super().tearDownClass()
|
||||
|
||||
@classmethod
|
||||
def start_prefill(cls):
|
||||
prefill_args = [
|
||||
"--trust-remote-code",
|
||||
"--disaggregation-mode",
|
||||
"prefill",
|
||||
"--tp",
|
||||
"1",
|
||||
]
|
||||
prefill_args += cls.transfer_backend + cls.rdma_devices
|
||||
cls.process_prefill = popen_launch_pd_server(
|
||||
cls.model,
|
||||
cls.prefill_url,
|
||||
timeout=DEFAULT_TIMEOUT_FOR_SERVER_LAUNCH,
|
||||
other_args=prefill_args,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def start_decode(cls):
|
||||
decode_args = [
|
||||
"--trust-remote-code",
|
||||
"--disaggregation-mode",
|
||||
"decode",
|
||||
"--tp",
|
||||
"1",
|
||||
"--base-gpu-id",
|
||||
"1",
|
||||
]
|
||||
decode_args += cls.transfer_backend + cls.rdma_devices
|
||||
cls.process_decode = popen_launch_pd_server(
|
||||
cls.model,
|
||||
cls.decode_url,
|
||||
timeout=DEFAULT_TIMEOUT_FOR_SERVER_LAUNCH,
|
||||
other_args=decode_args,
|
||||
)
|
||||
|
||||
def test_gsm8k(self):
|
||||
args = SimpleNamespace(
|
||||
num_shots=5,
|
||||
data_path=None,
|
||||
num_questions=200,
|
||||
max_new_tokens=512,
|
||||
parallel=128,
|
||||
host=f"http://{self.base_host}",
|
||||
port=int(self.lb_port),
|
||||
)
|
||||
metrics = run_eval_few_shot_gsm8k(args)
|
||||
print(f"Evaluation metrics: {metrics}")
|
||||
|
||||
self.assertGreater(metrics["accuracy"], 0.62)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user