import copy import io import json import logging import os import tarfile import time from collections import defaultdict import requests from ruamel.yaml import YAML from typing import Dict, Any sut_chart_root = os.path.join( os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "helm-chart", "sut" ) logger = logging.getLogger(__file__) lb_headers = ( {"Authorization": "Bearer " + os.getenv("LEADERBOARD_API_TOKEN", "")} if os.getenv("LEADERBOARD_API_TOKEN") else None ) pull_num: defaultdict = defaultdict() JOB_ID = int(os.getenv("JOB_ID", "-1")) assert JOB_ID != -1 LOAD_SUT_URL = os.getenv("LOAD_SUT_URL") assert LOAD_SUT_URL is not None GET_JOB_SUT_INFO_URL = os.getenv("GET_JOB_SUT_INFO_URL") assert GET_JOB_SUT_INFO_URL is not None def apply_env_to_values(values, envs): if "env" not in values: values["env"] = [] old_key_list = [x["name"] for x in values["env"]] for k, v in envs.items(): try: idx = old_key_list.index(k) values["env"][idx]["value"] = v except ValueError: values["env"].append({"name": k, "value": v}) return values def merge_values(base_value, incr_value): if isinstance(base_value, dict) and isinstance(incr_value, dict): for k in incr_value: base_value[k] = ( merge_values(base_value[k], incr_value[k]) if k in base_value else incr_value[k] ) elif isinstance(base_value, list) and isinstance(incr_value, list): base_value.extend(incr_value) else: base_value = incr_value return base_value def gen_chart_tarball(docker_image, policy=None): """docker image加上digest并根据image生成helm chart包, 失败直接异常退出 Args: docker_image (_type_): docker image Returns: tuple[BytesIO, dict]: [helm chart包file对象, values内容] """ # load values template with open(os.path.join(sut_chart_root, "values.yaml.tmpl")) as fp: yaml = YAML(typ="rt") values = yaml.load(fp) # update docker_image get_image_hash_url = os.getenv("GET_IMAGE_HASH_URL", None) if get_image_hash_url is not None: # convert tag to hash for docker_image resp = requests.get( get_image_hash_url, headers=lb_headers, params={"image": docker_image}, timeout=600, ) assert resp.status_code == 200, ( "Convert tag to hash for docker image failed, API retcode %d" % resp.status_code ) resp = resp.json() assert resp[ "success" ], "Convert tag to hash for docker image failed, response: %s" % str(resp) token = resp["data"]["image"].rsplit(":", 2) assert len(token) == 3, "Invalid docker image %s" % resp["data"]["image"] values["image"]["repository"] = token[0] values["image"]["tag"] = ":".join(token[1:]) else: token = docker_image.rsplit(":", 1) if len(token) != 2: raise RuntimeError("Invalid docker image %s" % docker_image) values["image"]["repository"] = token[0] values["image"]["tag"] = token[1] values["image"]["pullPolicy"] = policy # output values.yaml with open(os.path.join(sut_chart_root, "values.yaml"), "w") as fp: yaml = YAML(typ="rt") yaml.dump(values, fp) # tarball tarfp = io.BytesIO() with tarfile.open(fileobj=tarfp, mode="w:gz") as tar: tar.add( sut_chart_root, arcname=os.path.basename(sut_chart_root), recursive=True ) tarfp.seek(0) logger.debug(f"Generated chart using values: {values}") return tarfp, values def deploy_chart( name_suffix, readiness_timeout, chart_str=None, chart_fileobj=None, extra_values=None, restart_count_limit=3, pullimage_count_limit=3, ): """部署sut, 失败直接异常退出 Args: name_suffix (str): 同一个job有多个sut时, 区分不同sut的名称 readiness_timeout (int): readiness超时时间, 单位s chart_str (int, optional): chart url, 不为None则忽略chart_fileobj. Defaults to None. chart_fileobj (BytesIO, optional): helm chart包file对象, chart_str不为None使用. Defaults to None. extra_values (dict, optional): helm values的补充内容. Defaults to None. restart_count_limit (int, optional): sut重启次数限制, 超出则异常退出. Defaults to 3. pullimage_count_limit (int, optional): image拉取次数限制, 超出则异常退出. Defaults to 3. Returns: tuple[str, str]: [用于访问服务的k8s域名, 用于unload_sut的名称] """ logger.info( f"Deploying SUT application for JOB {JOB_ID}, name_suffix {name_suffix}, extra_values {extra_values}" ) # deploy payload = { "job_id": JOB_ID, "resource_name": name_suffix, "priorityclassname": os.environ.get("priorityclassname"), } extra_values = {} if not extra_values else extra_values payload["values"] = json.dumps(extra_values, ensure_ascii=False) if chart_str is not None: payload["helm_chart"] = chart_str resp = requests.post( LOAD_SUT_URL, data=payload, headers=lb_headers, timeout=600, ) else: assert ( chart_fileobj is not None ), "Either chart_str or chart_fileobj should be set" resp = requests.post( LOAD_SUT_URL, data=payload, headers=lb_headers, files=[("helm_chart_file", (name_suffix + ".tgz", chart_fileobj))], timeout=600, ) if resp.status_code != 200: raise RuntimeError( "Failed to deploy application status_code %d %s" % (resp.status_code, resp.text) ) resp = resp.json() if not resp["success"]: raise RuntimeError("Failed to deploy application response %r" % resp) service_name = resp["data"]["service_name"] sut_name = resp["data"]["sut_name"] logger.info(f"SUT application deployed with service_name {service_name}") # waiting for appliation ready running_at = None while True: retry_interval = 10 logger.info( f"Waiting {retry_interval} seconds to check whether SUT application {service_name} is ready..." ) time.sleep(retry_interval) check_result, running_at = check_sut_ready_from_resp( service_name, running_at, readiness_timeout, restart_count_limit, pullimage_count_limit, ) if check_result: break logger.info( f"SUT application for JOB {JOB_ID} name_suffix {name_suffix} is ready, service_name {service_name}" ) return service_name, sut_name def check_sut_ready_from_resp( service_name, running_at, readiness_timeout, restart_count_limit, pullimage_count_limit, ): try: resp = requests.get( f"{GET_JOB_SUT_INFO_URL}/{JOB_ID}", headers=lb_headers, params={"with_detail": True}, timeout=600, ) except Exception as e: logger.warning( f"Exception occured while getting SUT application {service_name} status", e ) return False, running_at if resp.status_code != 200: logger.warning( f"Get SUT application {service_name} status failed with status_code {resp.status_code}" ) return False, running_at resp = resp.json() if not resp["success"]: logger.warning( f"Get SUT application {service_name} status failed with response {resp}" ) return False, running_at if len(resp["data"]["sut"]) == 0: logger.warning("Empty SUT application status") return False, running_at resp_data_sut = copy.deepcopy(resp["data"]["sut"]) for status in resp_data_sut: del status["detail"] logger.info(f"Got SUT application status: {resp_data_sut}") for status in resp["data"]["sut"]: if status["phase"] in ["Succeeded", "Failed"]: raise RuntimeError( f"Some pods of SUT application {service_name} terminated with status {status}" ) elif status["phase"] in ["Pending", "Unknown"]: return False, running_at elif status["phase"] != "Running": raise RuntimeError( f"Unexcepted pod status {status} of SUT application {service_name}" ) if running_at is None: running_at = time.time() for ct in status["detail"]["status"]["container_statuses"]: if ct["restart_count"] > 0: logger.info( f"pod {status['pod_name']} restart count = {ct['restart_count']}" ) if ct["restart_count"] > restart_count_limit: raise RuntimeError( f"pod {status['pod_name']} restart too many times(over {restart_count_limit})" ) if ( ct["state"]["waiting"] is not None and "reason" in ct["state"]["waiting"] and ct["state"]["waiting"]["reason"] in ["ImagePullBackOff", "ErrImagePull"] ): pull_num[status["pod_name"]] += 1 logger.info( "pod %s has {pull_num[status['pod_name']]} times inspect pulling image info: %s" % (status["pod_name"], ct["state"]["waiting"]) ) if pull_num[status["pod_name"]] > pullimage_count_limit: raise RuntimeError(f"pod {status['pod_name']} cannot pull image") if not status["conditions"]["Ready"]: if running_at is not None and time.time() - running_at > readiness_timeout: raise RuntimeError( f"SUT Application readiness has exceeded readiness_timeout:{readiness_timeout}s" ) return False, running_at return True, running_at def resource_check(values: Dict[str, Any], image: str): # 补充镜像信息。 values["docker_image"] = image values["image_pull_policy"] = "IfNotPresent" values["image_pull_policy"] = "Always" # values["command"] = ["python", "run.py"] # 补充resources限制 values["resources"] = { "limits": { "cpu": 4, "memory": "16Gi", "iluvatar.ai/gpu": "1" }, "requests": { "cpu": 4, "memory": "16Gi", "iluvatar.ai/gpu": "1" }, } values["nodeSelector"] = { "contest.4pd.io/accelerator": "iluvatar-BI-V100" } values["tolerations"] = [ { "key": "hosttype", "operator": "Equal", "value": "iluvatar", "effect": "NoSchedule", } ] """ nodeSelector: contest.4pd.io/accelerator: iluvatar-BI-V100 tolerations: - key: hosttype operator: Equal value: iluvatar effect: NoSchedule """ # TODO 补充选择规则 return values