Files
2025-08-20 14:29:42 +08:00

332 lines
11 KiB
Python

import copy
import io
import json
import logging
import os
import tarfile
import time
from collections import defaultdict
import requests
from ruamel.yaml import YAML
from typing import Dict, Any
sut_chart_root = os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "helm-chart", "sut"
)
logger = logging.getLogger(__file__)
lb_headers = (
{"Authorization": "Bearer " + os.getenv("LEADERBOARD_API_TOKEN", "")}
if os.getenv("LEADERBOARD_API_TOKEN")
else None
)
pull_num: defaultdict = defaultdict()
JOB_ID = int(os.getenv("JOB_ID", "-1"))
assert JOB_ID != -1
LOAD_SUT_URL = os.getenv("LOAD_SUT_URL")
assert LOAD_SUT_URL is not None
GET_JOB_SUT_INFO_URL = os.getenv("GET_JOB_SUT_INFO_URL")
assert GET_JOB_SUT_INFO_URL is not None
def apply_env_to_values(values, envs):
if "env" not in values:
values["env"] = []
old_key_list = [x["name"] for x in values["env"]]
for k, v in envs.items():
try:
idx = old_key_list.index(k)
values["env"][idx]["value"] = v
except ValueError:
values["env"].append({"name": k, "value": v})
return values
def merge_values(base_value, incr_value):
if isinstance(base_value, dict) and isinstance(incr_value, dict):
for k in incr_value:
base_value[k] = (
merge_values(base_value[k], incr_value[k])
if k in base_value
else incr_value[k]
)
elif isinstance(base_value, list) and isinstance(incr_value, list):
base_value.extend(incr_value)
else:
base_value = incr_value
return base_value
def gen_chart_tarball(docker_image, policy=None):
"""docker image加上digest并根据image生成helm chart包, 失败直接异常退出
Args:
docker_image (_type_): docker image
Returns:
tuple[BytesIO, dict]: [helm chart包file对象, values内容]
"""
# load values template
with open(os.path.join(sut_chart_root, "values.yaml.tmpl")) as fp:
yaml = YAML(typ="rt")
values = yaml.load(fp)
# update docker_image
get_image_hash_url = os.getenv("GET_IMAGE_HASH_URL", None)
if get_image_hash_url is not None:
# convert tag to hash for docker_image
resp = requests.get(
get_image_hash_url,
headers=lb_headers,
params={"image": docker_image},
timeout=600,
)
assert resp.status_code == 200, (
"Convert tag to hash for docker image failed, API retcode %d"
% resp.status_code
)
resp = resp.json()
assert resp[
"success"
], "Convert tag to hash for docker image failed, response: %s" % str(resp)
token = resp["data"]["image"].rsplit(":", 2)
assert len(token) == 3, "Invalid docker image %s" % resp["data"]["image"]
values["image"]["repository"] = token[0]
values["image"]["tag"] = ":".join(token[1:])
else:
token = docker_image.rsplit(":", 1)
if len(token) != 2:
raise RuntimeError("Invalid docker image %s" % docker_image)
values["image"]["repository"] = token[0]
values["image"]["tag"] = token[1]
values["image"]["pullPolicy"] = policy
# output values.yaml
with open(os.path.join(sut_chart_root, "values.yaml"), "w") as fp:
yaml = YAML(typ="rt")
yaml.dump(values, fp)
# tarball
tarfp = io.BytesIO()
with tarfile.open(fileobj=tarfp, mode="w:gz") as tar:
tar.add(
sut_chart_root, arcname=os.path.basename(sut_chart_root), recursive=True
)
tarfp.seek(0)
logger.debug(f"Generated chart using values: {values}")
return tarfp, values
def deploy_chart(
name_suffix,
readiness_timeout,
chart_str=None,
chart_fileobj=None,
extra_values=None,
restart_count_limit=3,
pullimage_count_limit=3,
):
"""部署sut, 失败直接异常退出
Args:
name_suffix (str): 同一个job有多个sut时, 区分不同sut的名称
readiness_timeout (int): readiness超时时间, 单位s
chart_str (int, optional): chart url, 不为None则忽略chart_fileobj. Defaults to None.
chart_fileobj (BytesIO, optional): helm chart包file对象, chart_str不为None使用. Defaults to None.
extra_values (dict, optional): helm values的补充内容. Defaults to None.
restart_count_limit (int, optional): sut重启次数限制, 超出则异常退出. Defaults to 3.
pullimage_count_limit (int, optional): image拉取次数限制, 超出则异常退出. Defaults to 3.
Returns:
tuple[str, str]: [用于访问服务的k8s域名, 用于unload_sut的名称]
"""
logger.info(
f"Deploying SUT application for JOB {JOB_ID}, name_suffix {name_suffix}, extra_values {extra_values}"
)
# deploy
payload = {
"job_id": JOB_ID,
"resource_name": name_suffix,
"priorityclassname": os.environ.get("priorityclassname"),
}
extra_values = {} if not extra_values else extra_values
payload["values"] = json.dumps(extra_values, ensure_ascii=False)
if chart_str is not None:
payload["helm_chart"] = chart_str
resp = requests.post(
LOAD_SUT_URL,
data=payload,
headers=lb_headers,
timeout=600,
)
else:
assert (
chart_fileobj is not None
), "Either chart_str or chart_fileobj should be set"
resp = requests.post(
LOAD_SUT_URL,
data=payload,
headers=lb_headers,
files=[("helm_chart_file", (name_suffix + ".tgz", chart_fileobj))],
timeout=600,
)
if resp.status_code != 200:
raise RuntimeError(
"Failed to deploy application status_code %d %s"
% (resp.status_code, resp.text)
)
resp = resp.json()
if not resp["success"]:
raise RuntimeError("Failed to deploy application response %r" % resp)
service_name = resp["data"]["service_name"]
sut_name = resp["data"]["sut_name"]
logger.info(f"SUT application deployed with service_name {service_name}")
# waiting for appliation ready
running_at = None
while True:
retry_interval = 10
logger.info(
f"Waiting {retry_interval} seconds to check whether SUT application {service_name} is ready..."
)
time.sleep(retry_interval)
check_result, running_at = check_sut_ready_from_resp(
service_name,
running_at,
readiness_timeout,
restart_count_limit,
pullimage_count_limit,
)
if check_result:
break
logger.info(
f"SUT application for JOB {JOB_ID} name_suffix {name_suffix} is ready, service_name {service_name}"
)
return service_name, sut_name
def check_sut_ready_from_resp(
service_name,
running_at,
readiness_timeout,
restart_count_limit,
pullimage_count_limit,
):
try:
resp = requests.get(
f"{GET_JOB_SUT_INFO_URL}/{JOB_ID}",
headers=lb_headers,
params={"with_detail": True},
timeout=600,
)
except Exception as e:
logger.warning(
f"Exception occured while getting SUT application {service_name} status", e
)
return False, running_at
if resp.status_code != 200:
logger.warning(
f"Get SUT application {service_name} status failed with status_code {resp.status_code}"
)
return False, running_at
resp = resp.json()
if not resp["success"]:
logger.warning(
f"Get SUT application {service_name} status failed with response {resp}"
)
return False, running_at
if len(resp["data"]["sut"]) == 0:
logger.warning("Empty SUT application status")
return False, running_at
resp_data_sut = copy.deepcopy(resp["data"]["sut"])
for status in resp_data_sut:
del status["detail"]
logger.info(f"Got SUT application status: {resp_data_sut}")
for status in resp["data"]["sut"]:
if status["phase"] in ["Succeeded", "Failed"]:
raise RuntimeError(
f"Some pods of SUT application {service_name} terminated with status {status}"
)
elif status["phase"] in ["Pending", "Unknown"]:
return False, running_at
elif status["phase"] != "Running":
raise RuntimeError(
f"Unexcepted pod status {status} of SUT application {service_name}"
)
if running_at is None:
running_at = time.time()
for ct in status["detail"]["status"]["container_statuses"]:
if ct["restart_count"] > 0:
logger.info(
f"pod {status['pod_name']} restart count = {ct['restart_count']}"
)
if ct["restart_count"] > restart_count_limit:
raise RuntimeError(
f"pod {status['pod_name']} restart too many times(over {restart_count_limit})"
)
if (
ct["state"]["waiting"] is not None
and "reason" in ct["state"]["waiting"]
and ct["state"]["waiting"]["reason"]
in ["ImagePullBackOff", "ErrImagePull"]
):
pull_num[status["pod_name"]] += 1
logger.info(
"pod %s has {pull_num[status['pod_name']]} times inspect pulling image info: %s"
% (status["pod_name"], ct["state"]["waiting"])
)
if pull_num[status["pod_name"]] > pullimage_count_limit:
raise RuntimeError(f"pod {status['pod_name']} cannot pull image")
if not status["conditions"]["Ready"]:
if running_at is not None and time.time() - running_at > readiness_timeout:
raise RuntimeError(
f"SUT Application readiness has exceeded readiness_timeout:{readiness_timeout}s"
)
return False, running_at
return True, running_at
def resource_check(values: Dict[str, Any], image: str):
# 补充镜像信息。
values["docker_image"] = image
values["image_pull_policy"] = "IfNotPresent"
values["image_pull_policy"] = "Always"
# values["command"] = ["python", "run.py"]
# 补充resources限制
values["resources"] = {
"limits": {
"cpu": 4,
"memory": "16Gi",
"iluvatar.ai/gpu": "1"
},
"requests": {
"cpu": 4,
"memory": "16Gi",
"iluvatar.ai/gpu": "1"
},
}
values["nodeSelector"] = {
"contest.4pd.io/accelerator": "iluvatar-BI-V100"
}
values["tolerations"] = [
{
"key": "hosttype",
"operator": "Equal",
"value": "iluvatar",
"effect": "NoSchedule",
}
]
"""
nodeSelector:
contest.4pd.io/accelerator: iluvatar-BI-V100
tolerations:
- key: hosttype
operator: Equal
value: iluvatar
effect: NoSchedule
"""
# TODO 补充选择规则
return values