332 lines
11 KiB
Python
332 lines
11 KiB
Python
import copy
|
|
import io
|
|
import json
|
|
import logging
|
|
import os
|
|
import tarfile
|
|
import time
|
|
from collections import defaultdict
|
|
|
|
import requests
|
|
from ruamel.yaml import YAML
|
|
|
|
from typing import Dict, Any
|
|
|
|
sut_chart_root = os.path.join(
|
|
os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "helm-chart", "sut"
|
|
)
|
|
logger = logging.getLogger(__file__)
|
|
lb_headers = (
|
|
{"Authorization": "Bearer " + os.getenv("LEADERBOARD_API_TOKEN", "")}
|
|
if os.getenv("LEADERBOARD_API_TOKEN")
|
|
else None
|
|
)
|
|
pull_num: defaultdict = defaultdict()
|
|
|
|
JOB_ID = int(os.getenv("JOB_ID", "-1"))
|
|
assert JOB_ID != -1
|
|
LOAD_SUT_URL = os.getenv("LOAD_SUT_URL")
|
|
assert LOAD_SUT_URL is not None
|
|
GET_JOB_SUT_INFO_URL = os.getenv("GET_JOB_SUT_INFO_URL")
|
|
assert GET_JOB_SUT_INFO_URL is not None
|
|
|
|
|
|
def apply_env_to_values(values, envs):
|
|
if "env" not in values:
|
|
values["env"] = []
|
|
old_key_list = [x["name"] for x in values["env"]]
|
|
for k, v in envs.items():
|
|
try:
|
|
idx = old_key_list.index(k)
|
|
values["env"][idx]["value"] = v
|
|
except ValueError:
|
|
values["env"].append({"name": k, "value": v})
|
|
return values
|
|
|
|
|
|
def merge_values(base_value, incr_value):
|
|
if isinstance(base_value, dict) and isinstance(incr_value, dict):
|
|
for k in incr_value:
|
|
base_value[k] = (
|
|
merge_values(base_value[k], incr_value[k])
|
|
if k in base_value
|
|
else incr_value[k]
|
|
)
|
|
elif isinstance(base_value, list) and isinstance(incr_value, list):
|
|
base_value.extend(incr_value)
|
|
else:
|
|
base_value = incr_value
|
|
return base_value
|
|
|
|
|
|
def gen_chart_tarball(docker_image, policy=None):
|
|
"""docker image加上digest并根据image生成helm chart包, 失败直接异常退出
|
|
|
|
Args:
|
|
docker_image (_type_): docker image
|
|
|
|
Returns:
|
|
tuple[BytesIO, dict]: [helm chart包file对象, values内容]
|
|
"""
|
|
# load values template
|
|
with open(os.path.join(sut_chart_root, "values.yaml.tmpl")) as fp:
|
|
yaml = YAML(typ="rt")
|
|
values = yaml.load(fp)
|
|
# update docker_image
|
|
get_image_hash_url = os.getenv("GET_IMAGE_HASH_URL", None)
|
|
if get_image_hash_url is not None:
|
|
# convert tag to hash for docker_image
|
|
resp = requests.get(
|
|
get_image_hash_url,
|
|
headers=lb_headers,
|
|
params={"image": docker_image},
|
|
timeout=600,
|
|
)
|
|
assert resp.status_code == 200, (
|
|
"Convert tag to hash for docker image failed, API retcode %d"
|
|
% resp.status_code
|
|
)
|
|
resp = resp.json()
|
|
assert resp[
|
|
"success"
|
|
], "Convert tag to hash for docker image failed, response: %s" % str(resp)
|
|
token = resp["data"]["image"].rsplit(":", 2)
|
|
assert len(token) == 3, "Invalid docker image %s" % resp["data"]["image"]
|
|
values["image"]["repository"] = token[0]
|
|
values["image"]["tag"] = ":".join(token[1:])
|
|
else:
|
|
token = docker_image.rsplit(":", 1)
|
|
if len(token) != 2:
|
|
raise RuntimeError("Invalid docker image %s" % docker_image)
|
|
values["image"]["repository"] = token[0]
|
|
values["image"]["tag"] = token[1]
|
|
values["image"]["pullPolicy"] = policy
|
|
# output values.yaml
|
|
with open(os.path.join(sut_chart_root, "values.yaml"), "w") as fp:
|
|
yaml = YAML(typ="rt")
|
|
yaml.dump(values, fp)
|
|
# tarball
|
|
tarfp = io.BytesIO()
|
|
with tarfile.open(fileobj=tarfp, mode="w:gz") as tar:
|
|
tar.add(
|
|
sut_chart_root, arcname=os.path.basename(sut_chart_root), recursive=True
|
|
)
|
|
tarfp.seek(0)
|
|
logger.debug(f"Generated chart using values: {values}")
|
|
return tarfp, values
|
|
|
|
|
|
def deploy_chart(
|
|
name_suffix,
|
|
readiness_timeout,
|
|
chart_str=None,
|
|
chart_fileobj=None,
|
|
extra_values=None,
|
|
restart_count_limit=3,
|
|
pullimage_count_limit=3,
|
|
):
|
|
"""部署sut, 失败直接异常退出
|
|
|
|
Args:
|
|
name_suffix (str): 同一个job有多个sut时, 区分不同sut的名称
|
|
readiness_timeout (int): readiness超时时间, 单位s
|
|
chart_str (int, optional): chart url, 不为None则忽略chart_fileobj. Defaults to None.
|
|
chart_fileobj (BytesIO, optional): helm chart包file对象, chart_str不为None使用. Defaults to None.
|
|
extra_values (dict, optional): helm values的补充内容. Defaults to None.
|
|
restart_count_limit (int, optional): sut重启次数限制, 超出则异常退出. Defaults to 3.
|
|
pullimage_count_limit (int, optional): image拉取次数限制, 超出则异常退出. Defaults to 3.
|
|
|
|
Returns:
|
|
tuple[str, str]: [用于访问服务的k8s域名, 用于unload_sut的名称]
|
|
"""
|
|
logger.info(
|
|
f"Deploying SUT application for JOB {JOB_ID}, name_suffix {name_suffix}, extra_values {extra_values}"
|
|
)
|
|
# deploy
|
|
payload = {
|
|
"job_id": JOB_ID,
|
|
"resource_name": name_suffix,
|
|
"priorityclassname": os.environ.get("priorityclassname"),
|
|
}
|
|
extra_values = {} if not extra_values else extra_values
|
|
payload["values"] = json.dumps(extra_values, ensure_ascii=False)
|
|
if chart_str is not None:
|
|
payload["helm_chart"] = chart_str
|
|
resp = requests.post(
|
|
LOAD_SUT_URL,
|
|
data=payload,
|
|
headers=lb_headers,
|
|
timeout=600,
|
|
)
|
|
else:
|
|
assert (
|
|
chart_fileobj is not None
|
|
), "Either chart_str or chart_fileobj should be set"
|
|
resp = requests.post(
|
|
LOAD_SUT_URL,
|
|
data=payload,
|
|
headers=lb_headers,
|
|
files=[("helm_chart_file", (name_suffix + ".tgz", chart_fileobj))],
|
|
timeout=600,
|
|
)
|
|
if resp.status_code != 200:
|
|
raise RuntimeError(
|
|
"Failed to deploy application status_code %d %s"
|
|
% (resp.status_code, resp.text)
|
|
)
|
|
resp = resp.json()
|
|
if not resp["success"]:
|
|
raise RuntimeError("Failed to deploy application response %r" % resp)
|
|
service_name = resp["data"]["service_name"]
|
|
sut_name = resp["data"]["sut_name"]
|
|
logger.info(f"SUT application deployed with service_name {service_name}")
|
|
# waiting for appliation ready
|
|
running_at = None
|
|
while True:
|
|
retry_interval = 10
|
|
logger.info(
|
|
f"Waiting {retry_interval} seconds to check whether SUT application {service_name} is ready..."
|
|
)
|
|
time.sleep(retry_interval)
|
|
check_result, running_at = check_sut_ready_from_resp(
|
|
service_name,
|
|
running_at,
|
|
readiness_timeout,
|
|
restart_count_limit,
|
|
pullimage_count_limit,
|
|
)
|
|
if check_result:
|
|
break
|
|
|
|
logger.info(
|
|
f"SUT application for JOB {JOB_ID} name_suffix {name_suffix} is ready, service_name {service_name}"
|
|
)
|
|
return service_name, sut_name
|
|
|
|
|
|
def check_sut_ready_from_resp(
|
|
service_name,
|
|
running_at,
|
|
readiness_timeout,
|
|
restart_count_limit,
|
|
pullimage_count_limit,
|
|
):
|
|
try:
|
|
resp = requests.get(
|
|
f"{GET_JOB_SUT_INFO_URL}/{JOB_ID}",
|
|
headers=lb_headers,
|
|
params={"with_detail": True},
|
|
timeout=600,
|
|
)
|
|
except Exception as e:
|
|
logger.warning(
|
|
f"Exception occured while getting SUT application {service_name} status", e
|
|
)
|
|
return False, running_at
|
|
if resp.status_code != 200:
|
|
logger.warning(
|
|
f"Get SUT application {service_name} status failed with status_code {resp.status_code}"
|
|
)
|
|
return False, running_at
|
|
resp = resp.json()
|
|
if not resp["success"]:
|
|
logger.warning(
|
|
f"Get SUT application {service_name} status failed with response {resp}"
|
|
)
|
|
return False, running_at
|
|
if len(resp["data"]["sut"]) == 0:
|
|
logger.warning("Empty SUT application status")
|
|
return False, running_at
|
|
resp_data_sut = copy.deepcopy(resp["data"]["sut"])
|
|
for status in resp_data_sut:
|
|
del status["detail"]
|
|
logger.info(f"Got SUT application status: {resp_data_sut}")
|
|
for status in resp["data"]["sut"]:
|
|
if status["phase"] in ["Succeeded", "Failed"]:
|
|
raise RuntimeError(
|
|
f"Some pods of SUT application {service_name} terminated with status {status}"
|
|
)
|
|
elif status["phase"] in ["Pending", "Unknown"]:
|
|
return False, running_at
|
|
elif status["phase"] != "Running":
|
|
raise RuntimeError(
|
|
f"Unexcepted pod status {status} of SUT application {service_name}"
|
|
)
|
|
if running_at is None:
|
|
running_at = time.time()
|
|
for ct in status["detail"]["status"]["container_statuses"]:
|
|
if ct["restart_count"] > 0:
|
|
logger.info(
|
|
f"pod {status['pod_name']} restart count = {ct['restart_count']}"
|
|
)
|
|
if ct["restart_count"] > restart_count_limit:
|
|
raise RuntimeError(
|
|
f"pod {status['pod_name']} restart too many times(over {restart_count_limit})"
|
|
)
|
|
if (
|
|
ct["state"]["waiting"] is not None
|
|
and "reason" in ct["state"]["waiting"]
|
|
and ct["state"]["waiting"]["reason"]
|
|
in ["ImagePullBackOff", "ErrImagePull"]
|
|
):
|
|
pull_num[status["pod_name"]] += 1
|
|
logger.info(
|
|
"pod %s has {pull_num[status['pod_name']]} times inspect pulling image info: %s"
|
|
% (status["pod_name"], ct["state"]["waiting"])
|
|
)
|
|
if pull_num[status["pod_name"]] > pullimage_count_limit:
|
|
raise RuntimeError(f"pod {status['pod_name']} cannot pull image")
|
|
if not status["conditions"]["Ready"]:
|
|
if running_at is not None and time.time() - running_at > readiness_timeout:
|
|
raise RuntimeError(
|
|
f"SUT Application readiness has exceeded readiness_timeout:{readiness_timeout}s"
|
|
)
|
|
return False, running_at
|
|
return True, running_at
|
|
|
|
|
|
def resource_check(values: Dict[str, Any], image: str):
|
|
# 补充镜像信息。
|
|
values["docker_image"] = image
|
|
values["image_pull_policy"] = "IfNotPresent"
|
|
values["image_pull_policy"] = "Always"
|
|
# values["command"] = ["python", "run.py"]
|
|
# 补充resources限制
|
|
values["resources"] = {
|
|
"limits": {
|
|
"cpu": 4,
|
|
"memory": "16Gi",
|
|
"iluvatar.ai/gpu": "1"
|
|
},
|
|
"requests": {
|
|
"cpu": 4,
|
|
"memory": "16Gi",
|
|
"iluvatar.ai/gpu": "1"
|
|
},
|
|
}
|
|
|
|
values["nodeSelector"] = {
|
|
"contest.4pd.io/accelerator": "iluvatar-BI-V100"
|
|
}
|
|
values["tolerations"] = [
|
|
{
|
|
"key": "hosttype",
|
|
"operator": "Equal",
|
|
"value": "iluvatar",
|
|
"effect": "NoSchedule",
|
|
}
|
|
]
|
|
|
|
"""
|
|
nodeSelector:
|
|
contest.4pd.io/accelerator: iluvatar-BI-V100
|
|
tolerations:
|
|
- key: hosttype
|
|
operator: Equal
|
|
value: iluvatar
|
|
effect: NoSchedule
|
|
"""
|
|
|
|
# TODO 补充选择规则
|
|
return values
|