# -*- coding: utf-8 -*- import copy import io import json import os import re import tarfile import time from collections import defaultdict from typing import Any, Dict, Optional import requests from ruamel.yaml import YAML from utils.logger import logger sut_chart_root = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "helm-chart", "sut") headers = ( {'Authorization': 'Bearer ' + os.getenv("LEADERBOARD_API_TOKEN")} if os.getenv("LEADERBOARD_API_TOKEN") else None ) pull_num: defaultdict = defaultdict() JOB_ID = int(os.getenv("JOB_ID", "-1")) LOAD_SUT_URL = os.getenv("LOAD_SUT_URL") GET_JOB_SUT_INFO_URL = os.getenv("GET_JOB_SUT_INFO_URL") def apply_env_to_values(values, envs): if "env" not in values: values["env"] = [] old_key_list = [x["name"] for x in values["env"]] for k, v in envs.items(): try: idx = old_key_list.index(k) values["env"][idx]["value"] = v except ValueError: values["env"].append({"name": k, "value": v}) return values def merge_values(base_value, incr_value): if isinstance(base_value, dict) and isinstance(incr_value, dict): for k in incr_value: base_value[k] = merge_values(base_value[k], incr_value[k]) if k in base_value else incr_value[k] elif isinstance(base_value, list) and isinstance(incr_value, list): base_value.extend(incr_value) else: base_value = incr_value return base_value def gen_chart_tarball(docker_image): """docker image加上digest并根据image生成helm chart包, 失败直接异常退出 Args: docker_image (_type_): docker image Returns: tuple[BytesIO, dict]: [helm chart包file对象, values内容] """ # load values template with open(os.path.join(sut_chart_root, "values.yaml.tmpl")) as fp: yaml = YAML(typ="rt") values = yaml.load(fp) # update docker_image get_image_hash_url = os.getenv("GET_IMAGE_HASH_URL", None) logger.info(f"get_image_hash_url: {get_image_hash_url}") if get_image_hash_url is not None: # convert tag to hash for docker_image #docker_image = "harbor-contest.4pd.io/zhoushasha/speaker_identification:wo_model_v0" docker_image = "harbor-contest.4pd.io/zhoushasha/image_classification:wo_model_v3" resp = requests.get(get_image_hash_url, headers=headers, params={"image": docker_image}, timeout=600) logger.info(f"resp.text: {resp.text}") assert resp.status_code == 200, "Convert tag to hash for docker image failed, API retcode %d" % resp.status_code resp = resp.json() assert resp["success"], "Convert tag to hash for docker image failed, response: %s" % str(resp) token = resp["data"]["image"].rsplit(":", 2) assert len(token) == 3, "Invalid docker image %s" % resp["data"]["image"] values["image"]["repository"] = token[0] values["image"]["tag"] = ":".join(token[1:]) else: token = docker_image.rsplit(":", 1) if len(token) != 2: raise RuntimeError("Invalid docker image %s" % docker_image) values["image"]["repository"] = token[0] values["image"]["tag"] = token[1] # output values.yaml with open(os.path.join(sut_chart_root, "values.yaml"), "w") as fp: yaml = YAML(typ="rt") yaml.dump(values, fp) # tarball tarfp = io.BytesIO() with tarfile.open(fileobj=tarfp, mode="w:gz") as tar: tar.add(sut_chart_root, arcname=os.path.basename(sut_chart_root), recursive=True) tarfp.seek(0) logger.debug(f"Generated chart using values: {values}") return tarfp, values def deploy_chart( name_suffix, readiness_timeout, chart_str=None, chart_fileobj=None, extra_values=None, restart_count_limit=3, pullimage_count_limit=3, ): """部署sut, 失败直接异常退出 Args: name_suffix (str): 同一个job有多个sut时, 区分不同sut的名称 readiness_timeout (int): readiness超时时间, 单位s chart_str (int, optional): chart url, 不为None则忽略chart_fileobj. Defaults to None. chart_fileobj (BytesIO, optional): helm chart包file对象, chart_str不为None使用. Defaults to None. extra_values (dict, optional): helm values的补充内容. Defaults to None. restart_count_limit (int, optional): sut重启次数限制, 超出则异常退出. Defaults to 3. pullimage_count_limit (int, optional): image拉取次数限制, 超出则异常退出. Defaults to 3. Returns: tuple[str, str]: [用于访问服务的k8s域名, 用于unload_sut的名称] """ logger.info(f"Deploying SUT application for JOB {JOB_ID}, name_suffix {name_suffix}, extra_values {extra_values}") # deploy payload = { "job_id": JOB_ID, "resource_name": name_suffix, "priorityclassname": os.environ.get("priorityclassname"), } extra_values = {} if not extra_values else extra_values payload["values"] = json.dumps(extra_values, ensure_ascii=False) if chart_str is not None: payload["helm_chart"] = chart_str resp = requests.post(LOAD_SUT_URL, data=payload, headers=headers, timeout=600) else: assert chart_fileobj is not None, "Either chart_str or chart_fileobj should be set" logger.info(f"LOAD_SUT_URL: {LOAD_SUT_URL}") logger.info(f"payload: {payload}") logger.info(f"headers: {headers}") resp = requests.post( LOAD_SUT_URL, data=payload, headers=headers, files=[("helm_chart_file", (name_suffix + ".tgz", chart_fileobj))], timeout=600, ) if resp.status_code != 200: raise RuntimeError("Failed to deploy application status_code %d %s" % (resp.status_code, resp.text)) resp = resp.json() if not resp["success"]: logger.error("Failed to deploy application response %r", resp) service_name = resp["data"]["service_name"] sut_name = resp["data"]["sut_name"] logger.info(f"SUT application deployed with service_name {service_name}") # waiting for appliation ready running_at = None retry_count = 0 while True: retry_interval = 10 if retry_count % 20 == 19: retry_count += 1 logger.info(f"Waiting {retry_interval} seconds to check whether SUT application {service_name} is ready...") logger.info("20 retrys log this message again.") time.sleep(retry_interval) check_result, running_at = check_sut_ready_from_resp( service_name, running_at, readiness_timeout, restart_count_limit, pullimage_count_limit, ) if check_result: break logger.info(f"SUT application for JOB {JOB_ID} name_suffix {name_suffix} is ready, service_name {service_name}") return service_name, sut_name def check_sut_ready_from_resp( service_name, running_at, readiness_timeout, restart_count_limit, pullimage_count_limit, ): try: resp = requests.get( f"{GET_JOB_SUT_INFO_URL}/{JOB_ID}", headers=headers, params={"with_detail": True}, timeout=600, ) except Exception as e: logger.warning(f"Exception occured while getting SUT application {service_name} status", e) return False, running_at if resp.status_code != 200: logger.warning(f"Get SUT application {service_name} status failed with status_code {resp.status_code}") return False, running_at resp = resp.json() if not resp["success"]: logger.warning(f"Get SUT application {service_name} status failed with response {resp}") return False, running_at if len(resp["data"]["sut"]) == 0: logger.warning("Empty SUT application status") return False, running_at resp_data_sut = copy.deepcopy(resp["data"]["sut"]) for status in resp_data_sut: del status["detail"] logger.info(f"Got SUT application status: {resp_data_sut}") for status in resp["data"]["sut"]: if status["phase"] in ["Succeeded", "Failed"]: raise RuntimeError(f"Some pods of SUT application {service_name} terminated with status {status}") elif status["phase"] in ["Pending", "Unknown"]: return False, running_at elif status["phase"] != "Running": raise RuntimeError(f"Unexcepted pod status {status} of SUT application {service_name}") if running_at is None: running_at = time.time() for ct in status["detail"]["status"]["container_statuses"]: if ct["restart_count"] > 0: logger.info(f"pod {status['pod_name']} restart count = {ct['restart_count']}") if ct["restart_count"] > restart_count_limit: raise RuntimeError(f"pod {status['pod_name']} restart too many times(over {restart_count_limit})") if ( ct["state"]["waiting"] is not None and "reason" in ct["state"]["waiting"] and ct["state"]["waiting"]["reason"] in ["ImagePullBackOff", "ErrImagePull"] ): pull_num[status["pod_name"]] += 1 logger.info( "pod %s has {pull_num[status['pod_name']]} times inspect pulling image info: %s" % (status["pod_name"], ct["state"]["waiting"]) ) if pull_num[status["pod_name"]] > pullimage_count_limit: raise RuntimeError(f"pod {status['pod_name']} cannot pull image") if not status["conditions"]["Ready"]: if running_at is not None and time.time() - running_at > readiness_timeout: raise RuntimeError(f"SUT Application readiness has exceeded readiness_timeout:{readiness_timeout}s") return False, running_at return True, running_at def parse_resource(resource): if resource == -1: return -1 match = re.match(r"([\d\.]+)([mKMGTPENi]*)", resource) value, unit = match.groups() value = float(value) unit_mapping = { "": 1, "m": 1e-3, "K": 1e3, "M": 1e6, "G": 1e9, "T": 1e12, "P": 1e15, "E": 1e18, "Ki": 2**10, "Mi": 2**20, "Gi": 2**30, "Ti": 2**40, "Pi": 2**50, "Ei": 2**60, } if unit not in unit_mapping: raise ValueError(f"Unknown resources unit: {unit}") return value * unit_mapping[unit] def limit_resources(resource): if "limits" not in resource: return resource if "cpu" in resource["limits"]: cpu_limit = parse_resource(resource["limits"]["cpu"]) if cpu_limit > 30: logger.error("CPU limit exceeded. Adjusting to 30 cores.") resource["limits"]["cpu"] = "30" if "memory" in resource["limits"]: memory_limit = parse_resource(resource["limits"]["memory"]) if memory_limit > 100 * 2**30: logger.error("Memory limit exceeded, adjusting to 100Gi") resource["limits"]["memory"] = "100Gi" def consistent_resources(resource): if "limits" not in resource and "requests" not in resource: return resource elif "limits" in resource: resource["requests"] = resource["limits"] else: resource["limits"] = resource["requests"] return resource def resource_check(values: Dict[str, Any]): resources = values.get("resources", {}).get("limits", {}) if "nvidia.com/gpu" in resources and int(resources["nvidia.com/gpu"]) > 0: values["resources"]["limits"]["nvidia.com/gpumem"] = 8192 values["resources"]["limits"]["nvidia.com/gpucores"] = 10 values["resources"]["requests"] = values["resources"].get("requests", {}) if "cpu" not in values["resources"]["requests"] and "cpu" in values["resources"]["limits"]: values["resources"]["requests"]["cpu"] = values["resources"]["limits"]["cpu"] if "memory" not in values["resources"]["requests"] and "memory" in values["resources"]["limits"]: values["resources"]["requests"]["memory"] = values["resources"]["limits"]["memory"] values["resources"]["requests"]["nvidia.com/gpu"] = values["resources"]["limits"]["nvidia.com/gpu"] values["resources"]["requests"]["nvidia.com/gpumem"] = 8192 values["resources"]["requests"]["nvidia.com/gpucores"] = 10 values["nodeSelector"] = values.get("nodeSelector", {}) if "contest.4pd.io/accelerator" not in values["nodeSelector"]: values["nodeSelector"]["contest.4pd.io/accelerator"] = "A100-SXM4-80GBvgpu" gpu_type = values["nodeSelector"]["contest.4pd.io/accelerator"] gpu_num = resources["nvidia.com/gpu"] if gpu_type != "A100-SXM4-80GBvgpu": raise RuntimeError("GPU类型只能为A100-SXM4-80GBvgpu") if gpu_num != 1: raise RuntimeError("GPU个数只能为1") values["tolerations"] = values.get("tolerations", []) values["tolerations"].append( { "key": "hosttype", "operator": "Equal", "value": "vgpu", "effect": "NoSchedule", } ) return values