Files

332 lines
11 KiB
Python
Raw Permalink Normal View History

2025-08-28 18:46:56 +08:00
import copy
import io
import json
import logging
import os
import tarfile
import time
from collections import defaultdict
import requests
from ruamel.yaml import YAML
from typing import Dict, Any
sut_chart_root = os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "helm-chart", "sut"
)
logger = logging.getLogger(__file__)
lb_headers = (
{"Authorization": "Bearer " + os.getenv("LEADERBOARD_API_TOKEN", "")}
if os.getenv("LEADERBOARD_API_TOKEN")
else None
)
pull_num: defaultdict = defaultdict()
JOB_ID = int(os.getenv("JOB_ID", "-1"))
assert JOB_ID != -1
LOAD_SUT_URL = os.getenv("LOAD_SUT_URL")
assert LOAD_SUT_URL is not None
GET_JOB_SUT_INFO_URL = os.getenv("GET_JOB_SUT_INFO_URL")
assert GET_JOB_SUT_INFO_URL is not None
def apply_env_to_values(values, envs):
if "env" not in values:
values["env"] = []
old_key_list = [x["name"] for x in values["env"]]
for k, v in envs.items():
try:
idx = old_key_list.index(k)
values["env"][idx]["value"] = v
except ValueError:
values["env"].append({"name": k, "value": v})
return values
def merge_values(base_value, incr_value):
if isinstance(base_value, dict) and isinstance(incr_value, dict):
for k in incr_value:
base_value[k] = (
merge_values(base_value[k], incr_value[k])
if k in base_value
else incr_value[k]
)
elif isinstance(base_value, list) and isinstance(incr_value, list):
base_value.extend(incr_value)
else:
base_value = incr_value
return base_value
def gen_chart_tarball(docker_image, policy=None):
"""docker image加上digest并根据image生成helm chart包, 失败直接异常退出
Args:
docker_image (_type_): docker image
Returns:
tuple[BytesIO, dict]: [helm chart包file对象, values内容]
"""
# load values template
with open(os.path.join(sut_chart_root, "values.yaml.tmpl")) as fp:
yaml = YAML(typ="rt")
values = yaml.load(fp)
# update docker_image
get_image_hash_url = os.getenv("GET_IMAGE_HASH_URL", None)
if get_image_hash_url is not None:
# convert tag to hash for docker_image
resp = requests.get(
get_image_hash_url,
headers=lb_headers,
params={"image": docker_image},
timeout=600,
)
assert resp.status_code == 200, (
"Convert tag to hash for docker image failed, API retcode %d"
% resp.status_code
)
resp = resp.json()
assert resp[
"success"
], "Convert tag to hash for docker image failed, response: %s" % str(resp)
token = resp["data"]["image"].rsplit(":", 2)
assert len(token) == 3, "Invalid docker image %s" % resp["data"]["image"]
values["image"]["repository"] = token[0]
values["image"]["tag"] = ":".join(token[1:])
else:
token = docker_image.rsplit(":", 1)
if len(token) != 2:
raise RuntimeError("Invalid docker image %s" % docker_image)
values["image"]["repository"] = token[0]
values["image"]["tag"] = token[1]
values["image"]["pullPolicy"] = policy
# output values.yaml
with open(os.path.join(sut_chart_root, "values.yaml"), "w") as fp:
yaml = YAML(typ="rt")
yaml.dump(values, fp)
# tarball
tarfp = io.BytesIO()
with tarfile.open(fileobj=tarfp, mode="w:gz") as tar:
tar.add(
sut_chart_root, arcname=os.path.basename(sut_chart_root), recursive=True
)
tarfp.seek(0)
logger.debug(f"Generated chart using values: {values}")
return tarfp, values
def deploy_chart(
name_suffix,
readiness_timeout,
chart_str=None,
chart_fileobj=None,
extra_values=None,
restart_count_limit=3,
pullimage_count_limit=3,
):
"""部署sut, 失败直接异常退出
Args:
name_suffix (str): 同一个job有多个sut时, 区分不同sut的名称
readiness_timeout (int): readiness超时时间, 单位s
chart_str (int, optional): chart url, 不为None则忽略chart_fileobj. Defaults to None.
chart_fileobj (BytesIO, optional): helm chart包file对象, chart_str不为None使用. Defaults to None.
extra_values (dict, optional): helm values的补充内容. Defaults to None.
restart_count_limit (int, optional): sut重启次数限制, 超出则异常退出. Defaults to 3.
pullimage_count_limit (int, optional): image拉取次数限制, 超出则异常退出. Defaults to 3.
Returns:
tuple[str, str]: [用于访问服务的k8s域名, 用于unload_sut的名称]
"""
logger.info(
f"Deploying SUT application for JOB {JOB_ID}, name_suffix {name_suffix}, extra_values {extra_values}"
)
# deploy
payload = {
"job_id": JOB_ID,
"resource_name": name_suffix,
"priorityclassname": os.environ.get("priorityclassname"),
}
extra_values = {} if not extra_values else extra_values
payload["values"] = json.dumps(extra_values, ensure_ascii=False)
if chart_str is not None:
payload["helm_chart"] = chart_str
resp = requests.post(
LOAD_SUT_URL,
data=payload,
headers=lb_headers,
timeout=600,
)
else:
assert (
chart_fileobj is not None
), "Either chart_str or chart_fileobj should be set"
resp = requests.post(
LOAD_SUT_URL,
data=payload,
headers=lb_headers,
files=[("helm_chart_file", (name_suffix + ".tgz", chart_fileobj))],
timeout=600,
)
if resp.status_code != 200:
raise RuntimeError(
"Failed to deploy application status_code %d %s"
% (resp.status_code, resp.text)
)
resp = resp.json()
if not resp["success"]:
raise RuntimeError("Failed to deploy application response %r" % resp)
service_name = resp["data"]["service_name"]
sut_name = resp["data"]["sut_name"]
logger.info(f"SUT application deployed with service_name {service_name}")
# waiting for appliation ready
running_at = None
while True:
retry_interval = 10
logger.info(
f"Waiting {retry_interval} seconds to check whether SUT application {service_name} is ready..."
)
time.sleep(retry_interval)
check_result, running_at = check_sut_ready_from_resp(
service_name,
running_at,
readiness_timeout,
restart_count_limit,
pullimage_count_limit,
)
if check_result:
break
logger.info(
f"SUT application for JOB {JOB_ID} name_suffix {name_suffix} is ready, service_name {service_name}"
)
return service_name, sut_name
def check_sut_ready_from_resp(
service_name,
running_at,
readiness_timeout,
restart_count_limit,
pullimage_count_limit,
):
try:
resp = requests.get(
f"{GET_JOB_SUT_INFO_URL}/{JOB_ID}",
headers=lb_headers,
params={"with_detail": True},
timeout=600,
)
except Exception as e:
logger.warning(
f"Exception occured while getting SUT application {service_name} status", e
)
return False, running_at
if resp.status_code != 200:
logger.warning(
f"Get SUT application {service_name} status failed with status_code {resp.status_code}"
)
return False, running_at
resp = resp.json()
if not resp["success"]:
logger.warning(
f"Get SUT application {service_name} status failed with response {resp}"
)
return False, running_at
if len(resp["data"]["sut"]) == 0:
logger.warning("Empty SUT application status")
return False, running_at
resp_data_sut = copy.deepcopy(resp["data"]["sut"])
for status in resp_data_sut:
del status["detail"]
logger.info(f"Got SUT application status: {resp_data_sut}")
for status in resp["data"]["sut"]:
if status["phase"] in ["Succeeded", "Failed"]:
raise RuntimeError(
f"Some pods of SUT application {service_name} terminated with status {status}"
)
elif status["phase"] in ["Pending", "Unknown"]:
return False, running_at
elif status["phase"] != "Running":
raise RuntimeError(
f"Unexcepted pod status {status} of SUT application {service_name}"
)
if running_at is None:
running_at = time.time()
for ct in status["detail"]["status"]["container_statuses"]:
if ct["restart_count"] > 0:
logger.info(
f"pod {status['pod_name']} restart count = {ct['restart_count']}"
)
if ct["restart_count"] > restart_count_limit:
raise RuntimeError(
f"pod {status['pod_name']} restart too many times(over {restart_count_limit})"
)
if (
ct["state"]["waiting"] is not None
and "reason" in ct["state"]["waiting"]
and ct["state"]["waiting"]["reason"]
in ["ImagePullBackOff", "ErrImagePull"]
):
pull_num[status["pod_name"]] += 1
logger.info(
"pod %s has {pull_num[status['pod_name']]} times inspect pulling image info: %s"
% (status["pod_name"], ct["state"]["waiting"])
)
if pull_num[status["pod_name"]] > pullimage_count_limit:
raise RuntimeError(f"pod {status['pod_name']} cannot pull image")
if not status["conditions"]["Ready"]:
if running_at is not None and time.time() - running_at > readiness_timeout:
raise RuntimeError(
f"SUT Application readiness has exceeded readiness_timeout:{readiness_timeout}s"
)
return False, running_at
return True, running_at
def resource_check(values: Dict[str, Any], image: str):
# 补充镜像信息。
values["docker_image"] = image
values["image_pull_policy"] = "IfNotPresent"
values["image_pull_policy"] = "Always"
# values["command"] = ["python", "run.py"]
# 补充resources限制
values["resources"] = {
"limits": {
"cpu": 4,
"memory": "16Gi",
"iluvatar.ai/gpu": "1"
},
"requests": {
"cpu": 4,
"memory": "16Gi",
"iluvatar.ai/gpu": "1"
},
}
values["nodeSelector"] = {
"contest.4pd.io/accelerator": "iluvatar-BI-V100"
}
values["tolerations"] = [
{
"key": "hosttype",
"operator": "Equal",
"value": "iluvatar",
"effect": "NoSchedule",
}
]
"""
nodeSelector:
contest.4pd.io/accelerator: iluvatar-BI-V100
tolerations:
- key: hosttype
operator: Equal
value: iluvatar
effect: NoSchedule
"""
# TODO 补充选择规则
return values