[CI] Enable auto upgrade e2e estimated time for auto-partition suites (#6840)

### What this PR does / why we need it?
This patch add a schedule triggered workflow for auto upgrade e2e
estimated-time for batter load balance
1. The workflow will run the full e2e test to get the duration of each
test.
2. The script `update_estimated_time.py` will upgrade the
[config.json](https://github.com/vllm-project/vllm-ascend/blob/main/.github/workflows/scripts/config.yaml)
according to the latest time
3. The workflow will submit a pull request that includes changes to
`config.json` automatically
<img width="2484" height="764" alt="image"
src="https://github.com/user-attachments/assets/02f3459c-bb3b-4f8e-9966-8bb2e5c1bbea"
/>


### Does this PR introduce _any_ user-facing change?

### How was this patch tested?

- vLLM version: v0.15.0
- vLLM main:
83b47f67b1
- 
### Does this PR introduce _any_ user-facing change?

### How was this patch tested?

- vLLM version: v0.15.0
- vLLM main:
83b47f67b1

---------

Signed-off-by: wangli <wangli858794774@gmail.com>
This commit is contained in:
Li Wang
2026-03-04 10:38:34 +08:00
committed by GitHub
parent c7fd7a25f7
commit d431d7d526
5 changed files with 575 additions and 262 deletions

View File

@@ -1,244 +1,239 @@
import argparse
import json
import os
import sys
from datetime import datetime, timezone
from pathlib import Path
import tabulate
import yaml
from ci_utils import TestFile, run_e2e_files
from ci_utils import TestFile, TestRecord, run_tests
_CONFIG_PATH = Path(__file__).parent / "config.yaml"
def load_suites_from_config(config_path: str = "config.yaml") -> dict[str, list[TestFile]]:
# Get absolute path relative to this script
script_dir = Path(__file__).parent
abs_config_path = script_dir / config_path
with open(abs_config_path) as f:
suites_data = yaml.safe_load(f)
suites = {}
for suite_name, test_files in suites_data.items():
suites[suite_name] = []
for file_data in test_files:
name = file_data.get("name")
estimated_time = file_data.get("estimated_time", 60)
is_skipped = file_data.get("is_skipped", False)
suites[suite_name].append(TestFile(name, estimated_time, is_skipped))
return suites
def load_suites(config_path: Path = _CONFIG_PATH) -> dict[str, list[TestFile]]:
"""Load all test suites from config.yaml."""
data = yaml.safe_load(config_path.read_text())
return {
suite_name: [
TestFile(
name=entry["name"],
estimated_time=entry.get("estimated_time", 60),
is_skipped=entry.get("is_skipped", False),
)
for entry in entries
]
for suite_name, entries in data.items()
}
suites = load_suites_from_config()
def auto_partition(files, rank, size):
def partition(files: list[TestFile], rank: int, size: int) -> list[TestFile]:
"""
Partition files into size sublists with approximately equal sums of estimated times
using stable sorting, and return the partition for the specified rank.
Args:
files (list): List of file objects with estimated_time attribute
rank (int): Index of the partition to return (0 to size-1)
size (int): Number of partitions
Returns:
list: List of file objects in the specified rank's partition
Split non-skipped files into `size` groups of approximately equal estimated
time using a greedy algorithm, and return the group at index `rank`.
Files within the returned group are sorted ascending by estimated_time.
"""
# Filter out skipped files
files = [f for f in files if not f.is_skipped]
weights = [f.estimated_time for f in files]
if not weights or size <= 0 or size > len(weights):
active = [f for f in files if not f.is_skipped]
if not active or size <= 0 or size > len(active):
return []
# Create list of (weight, original_index) tuples
# Using negative index as secondary key to maintain original order for equal weights
indexed_weights = [(w, -i) for i, w in enumerate(weights)]
# Stable sort in descending order by weight
# If weights are equal, larger (negative) index comes first (i.e., earlier original position)
indexed_weights = sorted(indexed_weights, reverse=True)
# Sort descending by weight; use original index as tiebreaker to be stable
indexed = sorted(enumerate(active), key=lambda x: (-x[1].estimated_time, x[0]))
# Extract original indices (negate back to positive)
indexed_weights = [(w, -i) for w, i in indexed_weights]
# Initialize partitions and their sums
partitions = [[] for _ in range(size)]
buckets: list[list[int]] = [[] for _ in range(size)]
sums = [0.0] * size
# Greedy approach: assign each weight to partition with smallest current sum
for weight, idx in indexed_weights:
# Find partition with minimum sum
min_sum_idx = sums.index(min(sums))
partitions[min_sum_idx].append(idx)
sums[min_sum_idx] += weight
for idx, test in indexed:
lightest = sums.index(min(sums))
buckets[lightest].append(idx)
sums[lightest] += test.estimated_time
# Return the files corresponding to the indices in the specified rank's partition
indices = partitions[rank]
indices.sort(key=lambda i: files[i].estimated_time)
return [files[i] for i in indices]
return sorted([active[i] for i in buckets[rank]], key=lambda f: f.estimated_time)
def _get_disk_covered_dirs(all_suite_files: set[str], project_root: Path | str) -> list[str]:
covered_dirs = set()
for file_path in all_suite_files:
# e.g. tests/e2e/singlecard/test_foo.py -> tests/e2e/singlecard
parent_dir = (project_root / file_path).parent if os.path.isfile(file_path) else (project_root / file_path)
if parent_dir.exists():
# Store relative path to project root
try:
rel_dir = parent_dir.relative_to(project_root)
# Check if this directory is already covered by a parent directory
is_covered = False
for existing_dir in list(covered_dirs):
# If existing_dir is a parent of rel_dir, rel_dir is already covered
if existing_dir in rel_dir.parents or existing_dir == rel_dir:
is_covered = True
break
# If rel_dir is a parent of existing_dir, replace existing_dir with rel_dir
elif rel_dir in existing_dir.parents:
covered_dirs.remove(existing_dir)
# We continue checking other existing_dirs, but we know rel_dir should be added
# unless another parent covers it (which is handled by the first if block logic effectively
# but we need to be careful with modification during iteration, so we use list copy)
if not is_covered:
covered_dirs.add(rel_dir)
except ValueError:
pass
return covered_dirs
def _find_project_root() -> Path:
root = Path.cwd()
if (root / "tests").exists():
return root
# Fall back: assume script lives at .github/workflows/scripts/
return Path(__file__).parents[3]
def _sanity_check_suites(suites: dict[str, list[TestFile]]):
def _minimal_covered_dirs(file_paths: set[str], root: Path) -> set[Path]:
"""Return the minimal set of directories that covers all file_paths."""
dirs: set[Path] = set()
for fp in file_paths:
candidate = (root / fp).parent
if not candidate.exists():
continue
try:
rel = candidate.relative_to(root)
except ValueError:
continue
# Drop any existing entries that are subdirectories of rel
dirs = {d for d in dirs if rel not in d.parents}
# Only add rel if no ancestor already covers it
if not any(d == rel or d in rel.parents for d in dirs):
dirs.add(rel)
return dirs
def sanity_check(suites: dict[str, list[TestFile]]) -> None:
"""
Check if all test files defined in the suites exist on disk.
Verify that:
1. Every test file in any suite exists on disk.
2. No test_*.py files exist on disk (in covered dirs) that are absent from all suites.
Raises SystemExit with a descriptive message on failure.
"""
# 1. Collect all test files defined in all suites
all_suite_files = set()
for suite in suites.values():
for test_file in suite:
# Handle ::test_case syntax
file_path = test_file.name.split("::")[0]
all_suite_files.add(file_path)
suite_files = {f.name.split("::")[0] for tests in suites.values() for f in tests}
root = _find_project_root()
covered = _minimal_covered_dirs(suite_files, root)
# 2. Identify all directories covered by the suites
project_root = Path.cwd()
if not (project_root / "tests").exists():
script_dir = Path(__file__).parent
# .github/workflows/scripts -> ../../../ -> root
project_root = script_dir.parents[2]
# For now, we only check dirs under [tests/e2e/singlecard, tests/e2e/multicard]
covered_dirs = _get_disk_covered_dirs(all_suite_files, project_root)
disk_files = {str(p.relative_to(root)) for d in covered for p in (root / d).rglob("test_*.py")}
# 3. Scan disk for all test_*.py files in these directories
all_disk_files = set()
for dir_path in covered_dirs:
full_dir_path = project_root / dir_path
# rglob is equivalent to glob('**/' + pattern)
for py_file in full_dir_path.rglob("test_*.py"):
try:
rel_path = py_file.relative_to(project_root)
all_disk_files.add(str(rel_path))
except ValueError:
pass
missing_from_suite = sorted(disk_files - suite_files)
if missing_from_suite:
entries = "\n".join(f' TestFile("{f}"),' for f in missing_from_suite)
raise SystemExit(f"Test files on disk are not in any suite (add them or mark is_skipped=True):\n{entries}")
# 4. Find files on disk but missing from ANY suite
# We check if a disk file is present in 'all_suite_files' (union of all suites)
missing_files = sorted(list(all_disk_files - all_suite_files))
missing_from_disk = sorted(suite_files - disk_files)
if missing_from_disk:
entries = "\n".join(f' TestFile("{f}"),' for f in missing_from_disk)
raise SystemExit(f"Test files listed in suite do not exist on disk:\n{entries}")
missing_text = "\n".join(f'TestFile("{x}"),' for x in missing_files)
if missing_files:
assert len(missing_files) == 0, (
f"Some test files found on disk in covered directories are not in ANY test suite.\n"
f"Scanned directories: {sorted([str(d) for d in covered_dirs])}\n"
f"Missing files:\n"
f"{missing_text}\n"
f"If this is intentional, please label them as 'is_skipped=True' and add them to the test suite."
)
def _print_plan(
suite: str,
files: list[TestFile],
skipped: list[TestFile],
partition_info: str,
) -> None:
print(tabulate.tabulate([[suite, partition_info]], headers=["Suite", "Partition"], tablefmt="psql"))
total_est = sum(f.estimated_time for f in files)
print(f"✅ Enabled {len(files)} test(s) (est. total {total_est:.1f}s):")
for f in files:
print(f" - {f.name} (est={f.estimated_time}s)")
if skipped:
print(f"\n❌ Skipped {len(skipped)} test(s) (consider recovering):")
for f in skipped:
print(f" - {f.name}")
print(flush=True)
# 5. check if all files in suites exist on disk
non_existent_files = sorted(list(all_suite_files - all_disk_files))
non_existent_text = "\n".join(f'TestFile("{x}"),' for x in non_existent_files)
assert len(non_existent_files) == 0, (
f"Some test files in test suite do not exist on disk:\n"
f"{non_existent_text}\n"
f"Please check if the test files are correctly specified in the local repository."
def _print_results(
suite: str,
records: list[TestRecord],
skipped: list[TestFile],
partition_info: str,
) -> None:
print(tabulate.tabulate([[suite, partition_info]], headers=["Suite", "Partition"], tablefmt="psql"))
total_elapsed = sum(r.elapsed for r in records)
passed_count = sum(1 for r in records if r.passed)
print(f"Results: {passed_count}/{len(records)} passed (actual total {total_elapsed:.1f}s):")
for r in records:
status = "✅ PASSED" if r.passed else "❌ FAILED"
print(f" {status} {r.name} (actual={r.elapsed:.0f}s est={r.estimated:.0f}s)")
if skipped:
print(f"\n❌ Skipped {len(skipped)} test(s) (consider recovering):")
for f in skipped:
print(f" - {f.name}")
print(flush=True)
def _save_timing_json(
records: list[TestRecord],
suite: str,
partition_id: int | None,
partition_size: int | None,
output_path: Path,
) -> None:
passed_suites = [r.to_dict() for r in records if r.passed]
payload = {
"suite": suite,
"partition_id": partition_id,
"partition_size": partition_size,
"commit_sha": os.environ.get("GITHUB_SHA", ""),
"github_run_id": os.environ.get("GITHUB_RUN_ID", ""),
"timestamp": datetime.now(timezone.utc).isoformat(),
"tests": passed_suites,
}
output_path.write_text(json.dumps(payload, indent=2))
print(
f"Timing data written to {output_path} ({len(passed_suites)}/{len(records)} passed)",
flush=True,
)
def main():
arg_parser = argparse.ArgumentParser()
arg_parser.add_argument(
def main() -> None:
suites = load_suites()
parser = argparse.ArgumentParser(description="Run a named e2e test suite")
parser.add_argument(
"--suite",
type=str,
default=list(suites.keys())[0],
choices=list(suites.keys()) + ["all"],
help="The suite to run",
required=True,
choices=list(suites.keys()),
help="Name of the test suite to run",
)
arg_parser.add_argument(
parser.add_argument(
"--auto-partition-id",
type=int,
help="Use auto load balancing. The part id.",
default=None,
metavar="ID",
help="Zero-based partition index (requires --auto-partition-size)",
)
arg_parser.add_argument(
parser.add_argument(
"--auto-partition-size",
type=int,
help="Use auto load balancing. The number of parts.",
default=None,
metavar="N",
help="Total number of partitions",
)
arg_parser.add_argument(
parser.add_argument(
"--auto-upgrade-estimated-times",
action="store_true",
help="Automatically update estimated times in config.yaml based on actual timings (default: False) \
If enabled, the script always exit with 0, even if some tests fail, since the primary purpose is to gather \
timing data to improve estimates.",
)
parser.add_argument(
"--continue-on-error",
action="store_true",
default=True,
help="Continue running remaining tests even if one fails (useful for nightly tests)",
help="Continue running after a test failure (default: True)",
)
args = arg_parser.parse_args()
print(f"{args=}")
parser.add_argument(
"--timing-report-json",
type=Path,
default=Path("test_timing_data.json"),
help="Path to write the JSON timing data for CI aggregation",
)
args = parser.parse_args()
_sanity_check_suites(suites)
files = suites[args.suite]
sanity_check(suites)
files_disabled = [f for f in files if f.is_skipped]
all_files = suites[args.suite]
skipped = [f for f in all_files if f.is_skipped]
if args.auto_partition_size:
files = auto_partition(files, args.auto_partition_id, args.auto_partition_size)
# Print test info at beginning (similar to test/run_suite.py pretty_print_tests)
if args.auto_partition_size:
partition_info = (
f"{args.auto_partition_id + 1}/{args.auto_partition_size} (0-based id={args.auto_partition_id})"
)
if args.auto_partition_size is not None:
files = partition(all_files, args.auto_partition_id, args.auto_partition_size)
partition_info = f"{args.auto_partition_id + 1}/{args.auto_partition_size}"
else:
files = [f for f in all_files if not f.is_skipped]
partition_info = "full"
headers = ["Suite", "Partition"]
rows = [[args.suite, partition_info]]
msg = tabulate.tabulate(rows, headers=headers, tablefmt="psql") + "\n"
_print_plan(args.suite, files, skipped, partition_info)
total_est_time = sum(f.estimated_time for f in files)
msg += f"✅ Enabled {len(files)} test(s) (est total {total_est_time:.1f}s):\n"
for f in files:
msg += f" - {f.name} (est_time={f.estimated_time})\n"
msg += f"\n❌ Disabled {len(files_disabled)} test(s)(Please consider to recover them):\n"
for f in files_disabled:
msg += f" - {f.name} (est_time={f.estimated_time})\n"
print(msg, flush=True)
exit_code = run_e2e_files(
exit_code, records = run_tests(
files,
continue_on_error=args.continue_on_error,
)
# Print tests again at the end for visibility
msg = "\n" + tabulate.tabulate(rows, headers=headers, tablefmt="psql") + "\n"
msg += f"✅ Executed {len(files)} test(s) (est total {total_est_time:.1f}s):\n"
for f in files:
msg += f" - {f.name} (est_time={f.estimated_time})\n"
print(msg, flush=True)
_save_timing_json(records, args.suite, args.auto_partition_id, args.auto_partition_size, args.timing_report_json)
exit(exit_code)
_print_results(args.suite, records, skipped, partition_info)
if args.auto_upgrade_estimated_times:
sys.exit(0)
sys.exit(exit_code)
if __name__ == "__main__":