332 lines
13 KiB
Python
332 lines
13 KiB
Python
# -*- coding: utf-8 -*-
|
|
import copy
|
|
import io
|
|
import json
|
|
import os
|
|
import re
|
|
import tarfile
|
|
import time
|
|
from collections import defaultdict
|
|
from typing import Any, Dict, Optional
|
|
|
|
import requests
|
|
from ruamel.yaml import YAML
|
|
|
|
from utils.logger import logger
|
|
|
|
sut_chart_root = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "helm-chart", "sut")
|
|
headers = (
|
|
{'Authorization': 'Bearer ' + os.getenv("LEADERBOARD_API_TOKEN")} if os.getenv("LEADERBOARD_API_TOKEN") else None
|
|
)
|
|
pull_num: defaultdict = defaultdict()
|
|
JOB_ID = int(os.getenv("JOB_ID", "-1"))
|
|
LOAD_SUT_URL = os.getenv("LOAD_SUT_URL")
|
|
GET_JOB_SUT_INFO_URL = os.getenv("GET_JOB_SUT_INFO_URL")
|
|
|
|
|
|
def apply_env_to_values(values, envs):
|
|
if "env" not in values:
|
|
values["env"] = []
|
|
old_key_list = [x["name"] for x in values["env"]]
|
|
for k, v in envs.items():
|
|
try:
|
|
idx = old_key_list.index(k)
|
|
values["env"][idx]["value"] = v
|
|
except ValueError:
|
|
values["env"].append({"name": k, "value": v})
|
|
return values
|
|
|
|
|
|
def merge_values(base_value, incr_value):
|
|
if isinstance(base_value, dict) and isinstance(incr_value, dict):
|
|
for k in incr_value:
|
|
base_value[k] = merge_values(base_value[k], incr_value[k]) if k in base_value else incr_value[k]
|
|
elif isinstance(base_value, list) and isinstance(incr_value, list):
|
|
base_value.extend(incr_value)
|
|
else:
|
|
base_value = incr_value
|
|
return base_value
|
|
|
|
|
|
def gen_chart_tarball(docker_image):
|
|
"""docker image加上digest并根据image生成helm chart包, 失败直接异常退出
|
|
|
|
Args:
|
|
docker_image (_type_): docker image
|
|
|
|
Returns:
|
|
tuple[BytesIO, dict]: [helm chart包file对象, values内容]
|
|
"""
|
|
# load values template
|
|
with open(os.path.join(sut_chart_root, "values.yaml.tmpl")) as fp:
|
|
yaml = YAML(typ="rt")
|
|
values = yaml.load(fp)
|
|
# update docker_image
|
|
get_image_hash_url = os.getenv("GET_IMAGE_HASH_URL", None)
|
|
logger.info(f"get_image_hash_url: {get_image_hash_url}")
|
|
if get_image_hash_url is not None:
|
|
# convert tag to hash for docker_image
|
|
#docker_image = "harbor-contest.4pd.io/zhoushasha/speaker_identification:wo_model_v0"
|
|
docker_image = "harbor-contest.4pd.io/zhoushasha/image_classification:wo_model_v3"
|
|
resp = requests.get(get_image_hash_url, headers=headers, params={"image": docker_image}, timeout=600)
|
|
|
|
logger.info(f"resp.text: {resp.text}")
|
|
assert resp.status_code == 200, "Convert tag to hash for docker image failed, API retcode %d" % resp.status_code
|
|
resp = resp.json()
|
|
assert resp["success"], "Convert tag to hash for docker image failed, response: %s" % str(resp)
|
|
token = resp["data"]["image"].rsplit(":", 2)
|
|
assert len(token) == 3, "Invalid docker image %s" % resp["data"]["image"]
|
|
values["image"]["repository"] = token[0]
|
|
values["image"]["tag"] = ":".join(token[1:])
|
|
else:
|
|
token = docker_image.rsplit(":", 1)
|
|
if len(token) != 2:
|
|
raise RuntimeError("Invalid docker image %s" % docker_image)
|
|
values["image"]["repository"] = token[0]
|
|
values["image"]["tag"] = token[1]
|
|
# output values.yaml
|
|
with open(os.path.join(sut_chart_root, "values.yaml"), "w") as fp:
|
|
yaml = YAML(typ="rt")
|
|
yaml.dump(values, fp)
|
|
# tarball
|
|
tarfp = io.BytesIO()
|
|
with tarfile.open(fileobj=tarfp, mode="w:gz") as tar:
|
|
tar.add(sut_chart_root, arcname=os.path.basename(sut_chart_root), recursive=True)
|
|
tarfp.seek(0)
|
|
logger.debug(f"Generated chart using values: {values}")
|
|
return tarfp, values
|
|
|
|
|
|
def deploy_chart(
|
|
name_suffix,
|
|
readiness_timeout,
|
|
chart_str=None,
|
|
chart_fileobj=None,
|
|
extra_values=None,
|
|
restart_count_limit=3,
|
|
pullimage_count_limit=3,
|
|
):
|
|
"""部署sut, 失败直接异常退出
|
|
|
|
Args:
|
|
name_suffix (str): 同一个job有多个sut时, 区分不同sut的名称
|
|
readiness_timeout (int): readiness超时时间, 单位s
|
|
chart_str (int, optional): chart url, 不为None则忽略chart_fileobj. Defaults to None.
|
|
chart_fileobj (BytesIO, optional): helm chart包file对象, chart_str不为None使用. Defaults to None.
|
|
extra_values (dict, optional): helm values的补充内容. Defaults to None.
|
|
restart_count_limit (int, optional): sut重启次数限制, 超出则异常退出. Defaults to 3.
|
|
pullimage_count_limit (int, optional): image拉取次数限制, 超出则异常退出. Defaults to 3.
|
|
|
|
Returns:
|
|
tuple[str, str]: [用于访问服务的k8s域名, 用于unload_sut的名称]
|
|
"""
|
|
logger.info(f"Deploying SUT application for JOB {JOB_ID}, name_suffix {name_suffix}, extra_values {extra_values}")
|
|
# deploy
|
|
payload = {
|
|
"job_id": JOB_ID,
|
|
"resource_name": name_suffix,
|
|
"priorityclassname": os.environ.get("priorityclassname"),
|
|
}
|
|
extra_values = {} if not extra_values else extra_values
|
|
payload["values"] = json.dumps(extra_values, ensure_ascii=False)
|
|
if chart_str is not None:
|
|
payload["helm_chart"] = chart_str
|
|
resp = requests.post(LOAD_SUT_URL, data=payload, headers=headers, timeout=600)
|
|
else:
|
|
assert chart_fileobj is not None, "Either chart_str or chart_fileobj should be set"
|
|
|
|
logger.info(f"LOAD_SUT_URL: {LOAD_SUT_URL}")
|
|
logger.info(f"payload: {payload}")
|
|
logger.info(f"headers: {headers}")
|
|
|
|
resp = requests.post(
|
|
LOAD_SUT_URL,
|
|
data=payload,
|
|
headers=headers,
|
|
files=[("helm_chart_file", (name_suffix + ".tgz", chart_fileobj))],
|
|
timeout=600,
|
|
)
|
|
|
|
|
|
if resp.status_code != 200:
|
|
raise RuntimeError("Failed to deploy application status_code %d %s" % (resp.status_code, resp.text))
|
|
resp = resp.json()
|
|
if not resp["success"]:
|
|
logger.error("Failed to deploy application response %r", resp)
|
|
service_name = resp["data"]["service_name"]
|
|
sut_name = resp["data"]["sut_name"]
|
|
logger.info(f"SUT application deployed with service_name {service_name}")
|
|
# waiting for appliation ready
|
|
running_at = None
|
|
retry_count = 0
|
|
while True:
|
|
retry_interval = 10
|
|
if retry_count % 20 == 19:
|
|
retry_count += 1
|
|
logger.info(f"Waiting {retry_interval} seconds to check whether SUT application {service_name} is ready...")
|
|
logger.info("20 retrys log this message again.")
|
|
time.sleep(retry_interval)
|
|
check_result, running_at = check_sut_ready_from_resp(
|
|
service_name,
|
|
running_at,
|
|
readiness_timeout,
|
|
restart_count_limit,
|
|
pullimage_count_limit,
|
|
)
|
|
if check_result:
|
|
break
|
|
|
|
logger.info(f"SUT application for JOB {JOB_ID} name_suffix {name_suffix} is ready, service_name {service_name}")
|
|
return service_name, sut_name
|
|
|
|
|
|
def check_sut_ready_from_resp(
|
|
service_name,
|
|
running_at,
|
|
readiness_timeout,
|
|
restart_count_limit,
|
|
pullimage_count_limit,
|
|
):
|
|
try:
|
|
resp = requests.get(
|
|
f"{GET_JOB_SUT_INFO_URL}/{JOB_ID}",
|
|
headers=headers,
|
|
params={"with_detail": True},
|
|
timeout=600,
|
|
)
|
|
except Exception as e:
|
|
logger.warning(f"Exception occured while getting SUT application {service_name} status", e)
|
|
return False, running_at
|
|
if resp.status_code != 200:
|
|
logger.warning(f"Get SUT application {service_name} status failed with status_code {resp.status_code}")
|
|
return False, running_at
|
|
resp = resp.json()
|
|
if not resp["success"]:
|
|
logger.warning(f"Get SUT application {service_name} status failed with response {resp}")
|
|
return False, running_at
|
|
if len(resp["data"]["sut"]) == 0:
|
|
logger.warning("Empty SUT application status")
|
|
return False, running_at
|
|
resp_data_sut = copy.deepcopy(resp["data"]["sut"])
|
|
for status in resp_data_sut:
|
|
del status["detail"]
|
|
logger.info(f"Got SUT application status: {resp_data_sut}")
|
|
for status in resp["data"]["sut"]:
|
|
if status["phase"] in ["Succeeded", "Failed"]:
|
|
raise RuntimeError(f"Some pods of SUT application {service_name} terminated with status {status}")
|
|
elif status["phase"] in ["Pending", "Unknown"]:
|
|
return False, running_at
|
|
elif status["phase"] != "Running":
|
|
raise RuntimeError(f"Unexcepted pod status {status} of SUT application {service_name}")
|
|
if running_at is None:
|
|
running_at = time.time()
|
|
for ct in status["detail"]["status"]["container_statuses"]:
|
|
if ct["restart_count"] > 0:
|
|
logger.info(f"pod {status['pod_name']} restart count = {ct['restart_count']}")
|
|
if ct["restart_count"] > restart_count_limit:
|
|
raise RuntimeError(f"pod {status['pod_name']} restart too many times(over {restart_count_limit})")
|
|
if (
|
|
ct["state"]["waiting"] is not None
|
|
and "reason" in ct["state"]["waiting"]
|
|
and ct["state"]["waiting"]["reason"] in ["ImagePullBackOff", "ErrImagePull"]
|
|
):
|
|
pull_num[status["pod_name"]] += 1
|
|
logger.info(
|
|
"pod %s has {pull_num[status['pod_name']]} times inspect pulling image info: %s"
|
|
% (status["pod_name"], ct["state"]["waiting"])
|
|
)
|
|
if pull_num[status["pod_name"]] > pullimage_count_limit:
|
|
raise RuntimeError(f"pod {status['pod_name']} cannot pull image")
|
|
if not status["conditions"]["Ready"]:
|
|
if running_at is not None and time.time() - running_at > readiness_timeout:
|
|
raise RuntimeError(f"SUT Application readiness has exceeded readiness_timeout:{readiness_timeout}s")
|
|
return False, running_at
|
|
return True, running_at
|
|
|
|
|
|
def parse_resource(resource):
|
|
if resource == -1:
|
|
return -1
|
|
match = re.match(r"([\d\.]+)([mKMGTPENi]*)", resource)
|
|
value, unit = match.groups()
|
|
value = float(value)
|
|
unit_mapping = {
|
|
"": 1,
|
|
"m": 1e-3,
|
|
"K": 1e3,
|
|
"M": 1e6,
|
|
"G": 1e9,
|
|
"T": 1e12,
|
|
"P": 1e15,
|
|
"E": 1e18,
|
|
"Ki": 2**10,
|
|
"Mi": 2**20,
|
|
"Gi": 2**30,
|
|
"Ti": 2**40,
|
|
"Pi": 2**50,
|
|
"Ei": 2**60,
|
|
}
|
|
if unit not in unit_mapping:
|
|
raise ValueError(f"Unknown resources unit: {unit}")
|
|
return value * unit_mapping[unit]
|
|
|
|
|
|
def limit_resources(resource):
|
|
if "limits" not in resource:
|
|
return resource
|
|
if "cpu" in resource["limits"]:
|
|
cpu_limit = parse_resource(resource["limits"]["cpu"])
|
|
if cpu_limit > 30:
|
|
logger.error("CPU limit exceeded. Adjusting to 30 cores.")
|
|
resource["limits"]["cpu"] = "30"
|
|
if "memory" in resource["limits"]:
|
|
memory_limit = parse_resource(resource["limits"]["memory"])
|
|
if memory_limit > 100 * 2**30:
|
|
logger.error("Memory limit exceeded, adjusting to 100Gi")
|
|
resource["limits"]["memory"] = "100Gi"
|
|
|
|
|
|
def consistent_resources(resource):
|
|
if "limits" not in resource and "requests" not in resource:
|
|
return resource
|
|
elif "limits" in resource:
|
|
resource["requests"] = resource["limits"]
|
|
else:
|
|
resource["limits"] = resource["requests"]
|
|
return resource
|
|
|
|
|
|
def resource_check(values: Dict[str, Any]):
|
|
resources = values.get("resources", {}).get("limits", {})
|
|
if "nvidia.com/gpu" in resources and int(resources["nvidia.com/gpu"]) > 0:
|
|
values["resources"]["limits"]["nvidia.com/gpumem"] = 8192
|
|
values["resources"]["limits"]["nvidia.com/gpucores"] = 10
|
|
values["resources"]["requests"] = values["resources"].get("requests", {})
|
|
if "cpu" not in values["resources"]["requests"] and "cpu" in values["resources"]["limits"]:
|
|
values["resources"]["requests"]["cpu"] = values["resources"]["limits"]["cpu"]
|
|
if "memory" not in values["resources"]["requests"] and "memory" in values["resources"]["limits"]:
|
|
values["resources"]["requests"]["memory"] = values["resources"]["limits"]["memory"]
|
|
values["resources"]["requests"]["nvidia.com/gpu"] = values["resources"]["limits"]["nvidia.com/gpu"]
|
|
values["resources"]["requests"]["nvidia.com/gpumem"] = 8192
|
|
values["resources"]["requests"]["nvidia.com/gpucores"] = 10
|
|
|
|
values["nodeSelector"] = values.get("nodeSelector", {})
|
|
if "contest.4pd.io/accelerator" not in values["nodeSelector"]:
|
|
values["nodeSelector"]["contest.4pd.io/accelerator"] = "A100-SXM4-80GBvgpu"
|
|
gpu_type = values["nodeSelector"]["contest.4pd.io/accelerator"]
|
|
gpu_num = resources["nvidia.com/gpu"]
|
|
if gpu_type != "A100-SXM4-80GBvgpu":
|
|
raise RuntimeError("GPU类型只能为A100-SXM4-80GBvgpu")
|
|
if gpu_num != 1:
|
|
raise RuntimeError("GPU个数只能为1")
|
|
values["tolerations"] = values.get("tolerations", [])
|
|
values["tolerations"].append(
|
|
{
|
|
"key": "hosttype",
|
|
"operator": "Equal",
|
|
"value": "vgpu",
|
|
"effect": "NoSchedule",
|
|
}
|
|
)
|
|
return values
|