init
This commit is contained in:
331
utils/helm.py
Normal file
331
utils/helm.py
Normal file
@@ -0,0 +1,331 @@
|
||||
import copy
|
||||
import io
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import tarfile
|
||||
import time
|
||||
from collections import defaultdict
|
||||
|
||||
import requests
|
||||
from ruamel.yaml import YAML
|
||||
|
||||
from typing import Dict, Any
|
||||
|
||||
sut_chart_root = os.path.join(
|
||||
os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "helm-chart", "sut"
|
||||
)
|
||||
logger = logging.getLogger(__file__)
|
||||
lb_headers = (
|
||||
{"Authorization": "Bearer " + os.getenv("LEADERBOARD_API_TOKEN", "")}
|
||||
if os.getenv("LEADERBOARD_API_TOKEN")
|
||||
else None
|
||||
)
|
||||
pull_num: defaultdict = defaultdict()
|
||||
|
||||
JOB_ID = int(os.getenv("JOB_ID", "-1"))
|
||||
assert JOB_ID != -1
|
||||
LOAD_SUT_URL = os.getenv("LOAD_SUT_URL")
|
||||
assert LOAD_SUT_URL is not None
|
||||
GET_JOB_SUT_INFO_URL = os.getenv("GET_JOB_SUT_INFO_URL")
|
||||
assert GET_JOB_SUT_INFO_URL is not None
|
||||
|
||||
|
||||
def apply_env_to_values(values, envs):
|
||||
if "env" not in values:
|
||||
values["env"] = []
|
||||
old_key_list = [x["name"] for x in values["env"]]
|
||||
for k, v in envs.items():
|
||||
try:
|
||||
idx = old_key_list.index(k)
|
||||
values["env"][idx]["value"] = v
|
||||
except ValueError:
|
||||
values["env"].append({"name": k, "value": v})
|
||||
return values
|
||||
|
||||
|
||||
def merge_values(base_value, incr_value):
|
||||
if isinstance(base_value, dict) and isinstance(incr_value, dict):
|
||||
for k in incr_value:
|
||||
base_value[k] = (
|
||||
merge_values(base_value[k], incr_value[k])
|
||||
if k in base_value
|
||||
else incr_value[k]
|
||||
)
|
||||
elif isinstance(base_value, list) and isinstance(incr_value, list):
|
||||
base_value.extend(incr_value)
|
||||
else:
|
||||
base_value = incr_value
|
||||
return base_value
|
||||
|
||||
|
||||
def gen_chart_tarball(docker_image, policy=None):
|
||||
"""docker image加上digest并根据image生成helm chart包, 失败直接异常退出
|
||||
|
||||
Args:
|
||||
docker_image (_type_): docker image
|
||||
|
||||
Returns:
|
||||
tuple[BytesIO, dict]: [helm chart包file对象, values内容]
|
||||
"""
|
||||
# load values template
|
||||
with open(os.path.join(sut_chart_root, "values.yaml.tmpl")) as fp:
|
||||
yaml = YAML(typ="rt")
|
||||
values = yaml.load(fp)
|
||||
# update docker_image
|
||||
get_image_hash_url = os.getenv("GET_IMAGE_HASH_URL", None)
|
||||
if get_image_hash_url is not None:
|
||||
# convert tag to hash for docker_image
|
||||
resp = requests.get(
|
||||
get_image_hash_url,
|
||||
headers=lb_headers,
|
||||
params={"image": docker_image},
|
||||
timeout=600,
|
||||
)
|
||||
assert resp.status_code == 200, (
|
||||
"Convert tag to hash for docker image failed, API retcode %d"
|
||||
% resp.status_code
|
||||
)
|
||||
resp = resp.json()
|
||||
assert resp[
|
||||
"success"
|
||||
], "Convert tag to hash for docker image failed, response: %s" % str(resp)
|
||||
token = resp["data"]["image"].rsplit(":", 2)
|
||||
assert len(token) == 3, "Invalid docker image %s" % resp["data"]["image"]
|
||||
values["image"]["repository"] = token[0]
|
||||
values["image"]["tag"] = ":".join(token[1:])
|
||||
else:
|
||||
token = docker_image.rsplit(":", 1)
|
||||
if len(token) != 2:
|
||||
raise RuntimeError("Invalid docker image %s" % docker_image)
|
||||
values["image"]["repository"] = token[0]
|
||||
values["image"]["tag"] = token[1]
|
||||
values["image"]["pullPolicy"] = policy
|
||||
# output values.yaml
|
||||
with open(os.path.join(sut_chart_root, "values.yaml"), "w") as fp:
|
||||
yaml = YAML(typ="rt")
|
||||
yaml.dump(values, fp)
|
||||
# tarball
|
||||
tarfp = io.BytesIO()
|
||||
with tarfile.open(fileobj=tarfp, mode="w:gz") as tar:
|
||||
tar.add(
|
||||
sut_chart_root, arcname=os.path.basename(sut_chart_root), recursive=True
|
||||
)
|
||||
tarfp.seek(0)
|
||||
logger.debug(f"Generated chart using values: {values}")
|
||||
return tarfp, values
|
||||
|
||||
|
||||
def deploy_chart(
|
||||
name_suffix,
|
||||
readiness_timeout,
|
||||
chart_str=None,
|
||||
chart_fileobj=None,
|
||||
extra_values=None,
|
||||
restart_count_limit=3,
|
||||
pullimage_count_limit=3,
|
||||
):
|
||||
"""部署sut, 失败直接异常退出
|
||||
|
||||
Args:
|
||||
name_suffix (str): 同一个job有多个sut时, 区分不同sut的名称
|
||||
readiness_timeout (int): readiness超时时间, 单位s
|
||||
chart_str (int, optional): chart url, 不为None则忽略chart_fileobj. Defaults to None.
|
||||
chart_fileobj (BytesIO, optional): helm chart包file对象, chart_str不为None使用. Defaults to None.
|
||||
extra_values (dict, optional): helm values的补充内容. Defaults to None.
|
||||
restart_count_limit (int, optional): sut重启次数限制, 超出则异常退出. Defaults to 3.
|
||||
pullimage_count_limit (int, optional): image拉取次数限制, 超出则异常退出. Defaults to 3.
|
||||
|
||||
Returns:
|
||||
tuple[str, str]: [用于访问服务的k8s域名, 用于unload_sut的名称]
|
||||
"""
|
||||
logger.info(
|
||||
f"Deploying SUT application for JOB {JOB_ID}, name_suffix {name_suffix}, extra_values {extra_values}"
|
||||
)
|
||||
# deploy
|
||||
payload = {
|
||||
"job_id": JOB_ID,
|
||||
"resource_name": name_suffix,
|
||||
"priorityclassname": os.environ.get("priorityclassname"),
|
||||
}
|
||||
extra_values = {} if not extra_values else extra_values
|
||||
payload["values"] = json.dumps(extra_values, ensure_ascii=False)
|
||||
if chart_str is not None:
|
||||
payload["helm_chart"] = chart_str
|
||||
resp = requests.post(
|
||||
LOAD_SUT_URL,
|
||||
data=payload,
|
||||
headers=lb_headers,
|
||||
timeout=600,
|
||||
)
|
||||
else:
|
||||
assert (
|
||||
chart_fileobj is not None
|
||||
), "Either chart_str or chart_fileobj should be set"
|
||||
resp = requests.post(
|
||||
LOAD_SUT_URL,
|
||||
data=payload,
|
||||
headers=lb_headers,
|
||||
files=[("helm_chart_file", (name_suffix + ".tgz", chart_fileobj))],
|
||||
timeout=600,
|
||||
)
|
||||
if resp.status_code != 200:
|
||||
raise RuntimeError(
|
||||
"Failed to deploy application status_code %d %s"
|
||||
% (resp.status_code, resp.text)
|
||||
)
|
||||
resp = resp.json()
|
||||
if not resp["success"]:
|
||||
raise RuntimeError("Failed to deploy application response %r" % resp)
|
||||
service_name = resp["data"]["service_name"]
|
||||
sut_name = resp["data"]["sut_name"]
|
||||
logger.info(f"SUT application deployed with service_name {service_name}")
|
||||
# waiting for appliation ready
|
||||
running_at = None
|
||||
while True:
|
||||
retry_interval = 10
|
||||
logger.info(
|
||||
f"Waiting {retry_interval} seconds to check whether SUT application {service_name} is ready..."
|
||||
)
|
||||
time.sleep(retry_interval)
|
||||
check_result, running_at = check_sut_ready_from_resp(
|
||||
service_name,
|
||||
running_at,
|
||||
readiness_timeout,
|
||||
restart_count_limit,
|
||||
pullimage_count_limit,
|
||||
)
|
||||
if check_result:
|
||||
break
|
||||
|
||||
logger.info(
|
||||
f"SUT application for JOB {JOB_ID} name_suffix {name_suffix} is ready, service_name {service_name}"
|
||||
)
|
||||
return service_name, sut_name
|
||||
|
||||
|
||||
def check_sut_ready_from_resp(
|
||||
service_name,
|
||||
running_at,
|
||||
readiness_timeout,
|
||||
restart_count_limit,
|
||||
pullimage_count_limit,
|
||||
):
|
||||
try:
|
||||
resp = requests.get(
|
||||
f"{GET_JOB_SUT_INFO_URL}/{JOB_ID}",
|
||||
headers=lb_headers,
|
||||
params={"with_detail": True},
|
||||
timeout=600,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"Exception occured while getting SUT application {service_name} status", e
|
||||
)
|
||||
return False, running_at
|
||||
if resp.status_code != 200:
|
||||
logger.warning(
|
||||
f"Get SUT application {service_name} status failed with status_code {resp.status_code}"
|
||||
)
|
||||
return False, running_at
|
||||
resp = resp.json()
|
||||
if not resp["success"]:
|
||||
logger.warning(
|
||||
f"Get SUT application {service_name} status failed with response {resp}"
|
||||
)
|
||||
return False, running_at
|
||||
if len(resp["data"]["sut"]) == 0:
|
||||
logger.warning("Empty SUT application status")
|
||||
return False, running_at
|
||||
resp_data_sut = copy.deepcopy(resp["data"]["sut"])
|
||||
for status in resp_data_sut:
|
||||
del status["detail"]
|
||||
logger.info(f"Got SUT application status: {resp_data_sut}")
|
||||
for status in resp["data"]["sut"]:
|
||||
if status["phase"] in ["Succeeded", "Failed"]:
|
||||
raise RuntimeError(
|
||||
f"Some pods of SUT application {service_name} terminated with status {status}"
|
||||
)
|
||||
elif status["phase"] in ["Pending", "Unknown"]:
|
||||
return False, running_at
|
||||
elif status["phase"] != "Running":
|
||||
raise RuntimeError(
|
||||
f"Unexcepted pod status {status} of SUT application {service_name}"
|
||||
)
|
||||
if running_at is None:
|
||||
running_at = time.time()
|
||||
for ct in status["detail"]["status"]["container_statuses"]:
|
||||
if ct["restart_count"] > 0:
|
||||
logger.info(
|
||||
f"pod {status['pod_name']} restart count = {ct['restart_count']}"
|
||||
)
|
||||
if ct["restart_count"] > restart_count_limit:
|
||||
raise RuntimeError(
|
||||
f"pod {status['pod_name']} restart too many times(over {restart_count_limit})"
|
||||
)
|
||||
if (
|
||||
ct["state"]["waiting"] is not None
|
||||
and "reason" in ct["state"]["waiting"]
|
||||
and ct["state"]["waiting"]["reason"]
|
||||
in ["ImagePullBackOff", "ErrImagePull"]
|
||||
):
|
||||
pull_num[status["pod_name"]] += 1
|
||||
logger.info(
|
||||
"pod %s has {pull_num[status['pod_name']]} times inspect pulling image info: %s"
|
||||
% (status["pod_name"], ct["state"]["waiting"])
|
||||
)
|
||||
if pull_num[status["pod_name"]] > pullimage_count_limit:
|
||||
raise RuntimeError(f"pod {status['pod_name']} cannot pull image")
|
||||
if not status["conditions"]["Ready"]:
|
||||
if running_at is not None and time.time() - running_at > readiness_timeout:
|
||||
raise RuntimeError(
|
||||
f"SUT Application readiness has exceeded readiness_timeout:{readiness_timeout}s"
|
||||
)
|
||||
return False, running_at
|
||||
return True, running_at
|
||||
|
||||
|
||||
def resource_check(values: Dict[str, Any], image: str):
|
||||
# 补充镜像信息。
|
||||
values["docker_image"] = image
|
||||
values["image_pull_policy"] = "IfNotPresent"
|
||||
values["image_pull_policy"] = "Always"
|
||||
# values["command"] = ["python", "run.py"]
|
||||
# 补充resources限制
|
||||
values["resources"] = {
|
||||
"limits": {
|
||||
"cpu": 4,
|
||||
"memory": "16Gi",
|
||||
"iluvatar.ai/gpu": "1"
|
||||
},
|
||||
"requests": {
|
||||
"cpu": 4,
|
||||
"memory": "16Gi",
|
||||
"iluvatar.ai/gpu": "1"
|
||||
},
|
||||
}
|
||||
|
||||
values["nodeSelector"] = {
|
||||
"contest.4pd.io/accelerator": "iluvatar-BI-V100"
|
||||
}
|
||||
values["tolerations"] = [
|
||||
{
|
||||
"key": "hosttype",
|
||||
"operator": "Equal",
|
||||
"value": "iluvatar",
|
||||
"effect": "NoSchedule",
|
||||
}
|
||||
]
|
||||
|
||||
"""
|
||||
nodeSelector:
|
||||
contest.4pd.io/accelerator: iluvatar-BI-V100
|
||||
tolerations:
|
||||
- key: hosttype
|
||||
operator: Equal
|
||||
value: iluvatar
|
||||
effect: NoSchedule
|
||||
"""
|
||||
|
||||
# TODO 补充选择规则
|
||||
return values
|
||||
Reference in New Issue
Block a user