Files
zhousha 55a67e817e update
2025-08-06 15:38:55 +08:00

332 lines
13 KiB
Python

# -*- coding: utf-8 -*-
import copy
import io
import json
import os
import re
import tarfile
import time
from collections import defaultdict
from typing import Any, Dict, Optional
import requests
from ruamel.yaml import YAML
from utils.logger import logger
sut_chart_root = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "helm-chart", "sut")
headers = (
{'Authorization': 'Bearer ' + os.getenv("LEADERBOARD_API_TOKEN")} if os.getenv("LEADERBOARD_API_TOKEN") else None
)
pull_num: defaultdict = defaultdict()
JOB_ID = int(os.getenv("JOB_ID", "-1"))
LOAD_SUT_URL = os.getenv("LOAD_SUT_URL")
GET_JOB_SUT_INFO_URL = os.getenv("GET_JOB_SUT_INFO_URL")
def apply_env_to_values(values, envs):
if "env" not in values:
values["env"] = []
old_key_list = [x["name"] for x in values["env"]]
for k, v in envs.items():
try:
idx = old_key_list.index(k)
values["env"][idx]["value"] = v
except ValueError:
values["env"].append({"name": k, "value": v})
return values
def merge_values(base_value, incr_value):
if isinstance(base_value, dict) and isinstance(incr_value, dict):
for k in incr_value:
base_value[k] = merge_values(base_value[k], incr_value[k]) if k in base_value else incr_value[k]
elif isinstance(base_value, list) and isinstance(incr_value, list):
base_value.extend(incr_value)
else:
base_value = incr_value
return base_value
def gen_chart_tarball(docker_image):
"""docker image加上digest并根据image生成helm chart包, 失败直接异常退出
Args:
docker_image (_type_): docker image
Returns:
tuple[BytesIO, dict]: [helm chart包file对象, values内容]
"""
# load values template
with open(os.path.join(sut_chart_root, "values.yaml.tmpl")) as fp:
yaml = YAML(typ="rt")
values = yaml.load(fp)
# update docker_image
get_image_hash_url = os.getenv("GET_IMAGE_HASH_URL", None)
logger.info(f"get_image_hash_url: {get_image_hash_url}")
if get_image_hash_url is not None:
# convert tag to hash for docker_image
#docker_image = "harbor-contest.4pd.io/zhoushasha/speaker_identification:wo_model_v0"
docker_image = "harbor-contest.4pd.io/zhoushasha/image_classification:wo_model_v3"
resp = requests.get(get_image_hash_url, headers=headers, params={"image": docker_image}, timeout=600)
logger.info(f"resp.text: {resp.text}")
assert resp.status_code == 200, "Convert tag to hash for docker image failed, API retcode %d" % resp.status_code
resp = resp.json()
assert resp["success"], "Convert tag to hash for docker image failed, response: %s" % str(resp)
token = resp["data"]["image"].rsplit(":", 2)
assert len(token) == 3, "Invalid docker image %s" % resp["data"]["image"]
values["image"]["repository"] = token[0]
values["image"]["tag"] = ":".join(token[1:])
else:
token = docker_image.rsplit(":", 1)
if len(token) != 2:
raise RuntimeError("Invalid docker image %s" % docker_image)
values["image"]["repository"] = token[0]
values["image"]["tag"] = token[1]
# output values.yaml
with open(os.path.join(sut_chart_root, "values.yaml"), "w") as fp:
yaml = YAML(typ="rt")
yaml.dump(values, fp)
# tarball
tarfp = io.BytesIO()
with tarfile.open(fileobj=tarfp, mode="w:gz") as tar:
tar.add(sut_chart_root, arcname=os.path.basename(sut_chart_root), recursive=True)
tarfp.seek(0)
logger.debug(f"Generated chart using values: {values}")
return tarfp, values
def deploy_chart(
name_suffix,
readiness_timeout,
chart_str=None,
chart_fileobj=None,
extra_values=None,
restart_count_limit=3,
pullimage_count_limit=3,
):
"""部署sut, 失败直接异常退出
Args:
name_suffix (str): 同一个job有多个sut时, 区分不同sut的名称
readiness_timeout (int): readiness超时时间, 单位s
chart_str (int, optional): chart url, 不为None则忽略chart_fileobj. Defaults to None.
chart_fileobj (BytesIO, optional): helm chart包file对象, chart_str不为None使用. Defaults to None.
extra_values (dict, optional): helm values的补充内容. Defaults to None.
restart_count_limit (int, optional): sut重启次数限制, 超出则异常退出. Defaults to 3.
pullimage_count_limit (int, optional): image拉取次数限制, 超出则异常退出. Defaults to 3.
Returns:
tuple[str, str]: [用于访问服务的k8s域名, 用于unload_sut的名称]
"""
logger.info(f"Deploying SUT application for JOB {JOB_ID}, name_suffix {name_suffix}, extra_values {extra_values}")
# deploy
payload = {
"job_id": JOB_ID,
"resource_name": name_suffix,
"priorityclassname": os.environ.get("priorityclassname"),
}
extra_values = {} if not extra_values else extra_values
payload["values"] = json.dumps(extra_values, ensure_ascii=False)
if chart_str is not None:
payload["helm_chart"] = chart_str
resp = requests.post(LOAD_SUT_URL, data=payload, headers=headers, timeout=600)
else:
assert chart_fileobj is not None, "Either chart_str or chart_fileobj should be set"
logger.info(f"LOAD_SUT_URL: {LOAD_SUT_URL}")
logger.info(f"payload: {payload}")
logger.info(f"headers: {headers}")
resp = requests.post(
LOAD_SUT_URL,
data=payload,
headers=headers,
files=[("helm_chart_file", (name_suffix + ".tgz", chart_fileobj))],
timeout=600,
)
if resp.status_code != 200:
raise RuntimeError("Failed to deploy application status_code %d %s" % (resp.status_code, resp.text))
resp = resp.json()
if not resp["success"]:
logger.error("Failed to deploy application response %r", resp)
service_name = resp["data"]["service_name"]
sut_name = resp["data"]["sut_name"]
logger.info(f"SUT application deployed with service_name {service_name}")
# waiting for appliation ready
running_at = None
retry_count = 0
while True:
retry_interval = 10
if retry_count % 20 == 19:
retry_count += 1
logger.info(f"Waiting {retry_interval} seconds to check whether SUT application {service_name} is ready...")
logger.info("20 retrys log this message again.")
time.sleep(retry_interval)
check_result, running_at = check_sut_ready_from_resp(
service_name,
running_at,
readiness_timeout,
restart_count_limit,
pullimage_count_limit,
)
if check_result:
break
logger.info(f"SUT application for JOB {JOB_ID} name_suffix {name_suffix} is ready, service_name {service_name}")
return service_name, sut_name
def check_sut_ready_from_resp(
service_name,
running_at,
readiness_timeout,
restart_count_limit,
pullimage_count_limit,
):
try:
resp = requests.get(
f"{GET_JOB_SUT_INFO_URL}/{JOB_ID}",
headers=headers,
params={"with_detail": True},
timeout=600,
)
except Exception as e:
logger.warning(f"Exception occured while getting SUT application {service_name} status", e)
return False, running_at
if resp.status_code != 200:
logger.warning(f"Get SUT application {service_name} status failed with status_code {resp.status_code}")
return False, running_at
resp = resp.json()
if not resp["success"]:
logger.warning(f"Get SUT application {service_name} status failed with response {resp}")
return False, running_at
if len(resp["data"]["sut"]) == 0:
logger.warning("Empty SUT application status")
return False, running_at
resp_data_sut = copy.deepcopy(resp["data"]["sut"])
for status in resp_data_sut:
del status["detail"]
logger.info(f"Got SUT application status: {resp_data_sut}")
for status in resp["data"]["sut"]:
if status["phase"] in ["Succeeded", "Failed"]:
raise RuntimeError(f"Some pods of SUT application {service_name} terminated with status {status}")
elif status["phase"] in ["Pending", "Unknown"]:
return False, running_at
elif status["phase"] != "Running":
raise RuntimeError(f"Unexcepted pod status {status} of SUT application {service_name}")
if running_at is None:
running_at = time.time()
for ct in status["detail"]["status"]["container_statuses"]:
if ct["restart_count"] > 0:
logger.info(f"pod {status['pod_name']} restart count = {ct['restart_count']}")
if ct["restart_count"] > restart_count_limit:
raise RuntimeError(f"pod {status['pod_name']} restart too many times(over {restart_count_limit})")
if (
ct["state"]["waiting"] is not None
and "reason" in ct["state"]["waiting"]
and ct["state"]["waiting"]["reason"] in ["ImagePullBackOff", "ErrImagePull"]
):
pull_num[status["pod_name"]] += 1
logger.info(
"pod %s has {pull_num[status['pod_name']]} times inspect pulling image info: %s"
% (status["pod_name"], ct["state"]["waiting"])
)
if pull_num[status["pod_name"]] > pullimage_count_limit:
raise RuntimeError(f"pod {status['pod_name']} cannot pull image")
if not status["conditions"]["Ready"]:
if running_at is not None and time.time() - running_at > readiness_timeout:
raise RuntimeError(f"SUT Application readiness has exceeded readiness_timeout:{readiness_timeout}s")
return False, running_at
return True, running_at
def parse_resource(resource):
if resource == -1:
return -1
match = re.match(r"([\d\.]+)([mKMGTPENi]*)", resource)
value, unit = match.groups()
value = float(value)
unit_mapping = {
"": 1,
"m": 1e-3,
"K": 1e3,
"M": 1e6,
"G": 1e9,
"T": 1e12,
"P": 1e15,
"E": 1e18,
"Ki": 2**10,
"Mi": 2**20,
"Gi": 2**30,
"Ti": 2**40,
"Pi": 2**50,
"Ei": 2**60,
}
if unit not in unit_mapping:
raise ValueError(f"Unknown resources unit: {unit}")
return value * unit_mapping[unit]
def limit_resources(resource):
if "limits" not in resource:
return resource
if "cpu" in resource["limits"]:
cpu_limit = parse_resource(resource["limits"]["cpu"])
if cpu_limit > 30:
logger.error("CPU limit exceeded. Adjusting to 30 cores.")
resource["limits"]["cpu"] = "30"
if "memory" in resource["limits"]:
memory_limit = parse_resource(resource["limits"]["memory"])
if memory_limit > 100 * 2**30:
logger.error("Memory limit exceeded, adjusting to 100Gi")
resource["limits"]["memory"] = "100Gi"
def consistent_resources(resource):
if "limits" not in resource and "requests" not in resource:
return resource
elif "limits" in resource:
resource["requests"] = resource["limits"]
else:
resource["limits"] = resource["requests"]
return resource
def resource_check(values: Dict[str, Any]):
resources = values.get("resources", {}).get("limits", {})
if "nvidia.com/gpu" in resources and int(resources["nvidia.com/gpu"]) > 0:
values["resources"]["limits"]["nvidia.com/gpumem"] = 8192
values["resources"]["limits"]["nvidia.com/gpucores"] = 10
values["resources"]["requests"] = values["resources"].get("requests", {})
if "cpu" not in values["resources"]["requests"] and "cpu" in values["resources"]["limits"]:
values["resources"]["requests"]["cpu"] = values["resources"]["limits"]["cpu"]
if "memory" not in values["resources"]["requests"] and "memory" in values["resources"]["limits"]:
values["resources"]["requests"]["memory"] = values["resources"]["limits"]["memory"]
values["resources"]["requests"]["nvidia.com/gpu"] = values["resources"]["limits"]["nvidia.com/gpu"]
values["resources"]["requests"]["nvidia.com/gpumem"] = 8192
values["resources"]["requests"]["nvidia.com/gpucores"] = 10
values["nodeSelector"] = values.get("nodeSelector", {})
if "contest.4pd.io/accelerator" not in values["nodeSelector"]:
values["nodeSelector"]["contest.4pd.io/accelerator"] = "A100-SXM4-80GBvgpu"
gpu_type = values["nodeSelector"]["contest.4pd.io/accelerator"]
gpu_num = resources["nvidia.com/gpu"]
if gpu_type != "A100-SXM4-80GBvgpu":
raise RuntimeError("GPU类型只能为A100-SXM4-80GBvgpu")
if gpu_num != 1:
raise RuntimeError("GPU个数只能为1")
values["tolerations"] = values.get("tolerations", [])
values["tolerations"].append(
{
"key": "hosttype",
"operator": "Equal",
"value": "vgpu",
"effect": "NoSchedule",
}
)
return values