Files
enginex-bi_series-vc-cnn/run_callback.py
zhousha 44954b7481 update
2025-08-06 15:45:17 +08:00

720 lines
23 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import os
import sys
import time
import tempfile
import zipfile
import threading
from collections import defaultdict
from typing import Dict, List
import yaml
from pydantic import ValidationError
from schemas.dataset import QueryData
from utils.client_callback import ClientCallback, EvaluateResult, StopException
from utils.logger import log
from utils.service import register_sut
from utils.update_submit import change_product_available
from utils.file import dump_json, load_yaml, unzip_dir, load_json, write_file, dump_yaml
from utils.leaderboard import change_product_unavailable
lck = threading.Lock()
# Environment variables by leaderboard
DATASET_FILEPATH = os.environ["DATASET_FILEPATH"]
RESULT_FILEPATH = os.environ["RESULT_FILEPATH"]
DETAILED_CASES_FILEPATH = os.environ["DETAILED_CASES_FILEPATH"]
SUBMIT_CONFIG_FILEPATH = os.environ["SUBMIT_CONFIG_FILEPATH"]
BENCHMARK_NAME = os.environ["BENCHMARK_NAME"]
TEST_CONCURRENCY = int(os.getenv('TEST_CONCURRENCY', 1))
THRESHOLD_OMCER = float(os.getenv('THRESHOLD_OMCER', 0.8))
log.info(f"DATASET_FILEPATH: {DATASET_FILEPATH}")
workspace_path = "/tmp/workspace"
# Environment variables by kubernetes
MY_POD_IP = os.environ["MY_POD_IP"]
# constants
RESOURCE_NAME = BENCHMARK_NAME
# Environment variables by judge_flow_config
LANG = os.getenv("lang")
SUT_CPU = os.getenv("SUT_CPU", "2")
SUT_MEMORY = os.getenv("SUT_MEMORY", "4Gi")
SUT_VGPU = os.getenv("SUT_VGPU", "1")
#SUT_VGPU_MEM = os.getenv("SUT_VGPU_MEM", str(1843 * int(SUT_VGPU)))
#SUT_VGPU_CORES = os.getenv("SUT_VGPU_CORES", str(8 * int(SUT_VGPU)))
SUT_VGPU_ACCELERATOR = os.getenv("SUT_VGPU_ACCELERATOR", "iluvatar-BI-V100")
RESOURCE_TYPE = os.getenv("RESOURCE_TYPE", "vgpu")
assert RESOURCE_TYPE in [
"cpu",
"vgpu",
], "benchmark judge_flow_config error: RESOURCE_TYPE should be cpu or vgpu"
unzip_dir(DATASET_FILEPATH, workspace_path)
def get_sut_url_kubernetes():
with open(SUBMIT_CONFIG_FILEPATH, "r") as f:
submit_config = yaml.safe_load(f)
assert isinstance(submit_config, dict)
submit_config.setdefault("values", {})
submit_config["values"]["containers"] = [
{
"name": "corex-container",
"image": "harbor.4pd.io/lab-platform/inf/python:3.9", #镜像
"command": ["sleep"], # 替换为你的模型启动命令使用python解释器
"args": ["3600"], # 替换为你的模型参数,运行我的推理脚本
# 添加存储卷挂载
#"volumeMounts": [
# {
# "name": "model-volume",
# "mountPath": "/model" # 挂载到/model目录
# }
#]
}
]
"""
# 添加存储卷配置
submit_config["values"]["volumes"] = [
{
"name": "model-volume",
"persistentVolumeClaim": {
"claimName": "sid-model-pvc" # 使用已有的PVC
}
}
]
"""
"""
# Inject specified cpu and memory
resource = {
"cpu": SUT_CPU,
"memory": SUT_MEMORY,
}
"""
submit_config["values"]["resources"] = {
"requests":{},
"limits": {},
}
limits = submit_config["values"]["resources"]["limits"]
requests = submit_config["values"]["resources"]["requests"]
# 替换nvidia资源键为iluvatar.ai/gpu
vgpu_resource = {
"iluvatar.ai/gpu": SUT_VGPU, # 对应你的GPU资源键
# 若需要其他资源如显存按你的K8s配置补充例如
# "iluvatar.ai/gpumem": SUT_VGPU_MEM,
}
limits.update(vgpu_resource)
requests.update(vgpu_resource)
# 节点选择器替换为你的accelerator标签
submit_config["values"]["nodeSelector"] = {
"contest.4pd.io/accelerator": "iluvatar-BI-V100" # 你的节点标签
}
# 容忍度替换为你的tolerations配置
log.info(f"submit_config: {submit_config}")
log.info(f"RESOURCE_NAME: {RESOURCE_NAME}")
return register_sut(submit_config, RESOURCE_NAME).replace(
"ws://", "http://"
)
def get_sut_url():
return get_sut_url_kubernetes()
#SUT_URL = get_sut_url()
#os.environ["SUT_URL"] = SUT_URL
#############################################################################
import requests
import base64
def gen_req_body(apiname, APPId, file_path=None, featureId=None, featureInfo=None, dstFeatureId=None):
"""
生成请求的body
:param apiname
:param APPId: Appid
:param file_name: 文件路径
:return:
"""
if apiname == 'createFeature':
with open(file_path, "rb") as f:
audioBytes = f.read()
body = {
"header": {
"app_id": APPId,
"status": 3
},
"parameter": {
"s782b4996": {
"func": "createFeature",
"groupId": "test_voiceprint_e",
"featureId": featureId,
"featureInfo": featureInfo,
"createFeatureRes": {
"encoding": "utf8",
"compress": "raw",
"format": "json"
}
}
},
"payload": {
"resource": {
"encoding": "lame",
"sample_rate": 16000,
"channels": 1,
"bit_depth": 16,
"status": 3,
"audio": str(base64.b64encode(audioBytes), 'UTF-8')
}
}
}
elif apiname == 'createGroup':
body = {
"header": {
"app_id": APPId,
"status": 3
},
"parameter": {
"s782b4996": {
"func": "createGroup",
"groupId": "test_voiceprint_e",
"groupName": "vip_user",
"groupInfo": "store_vip_user_voiceprint",
"createGroupRes": {
"encoding": "utf8",
"compress": "raw",
"format": "json"
}
}
}
}
elif apiname == 'deleteFeature':
body = {
"header": {
"app_id": APPId,
"status": 3
},
"parameter": {
"s782b4996": {
"func": "deleteFeature",
"groupId": "iFLYTEK_examples_groupId",
"featureId": "iFLYTEK_examples_featureId",
"deleteFeatureRes": {
"encoding": "utf8",
"compress": "raw",
"format": "json"
}
}
}
}
elif apiname == 'queryFeatureList':
body = {
"header": {
"app_id": APPId,
"status": 3
},
"parameter": {
"s782b4996": {
"func": "queryFeatureList",
"groupId": "user_voiceprint_2",
"queryFeatureListRes": {
"encoding": "utf8",
"compress": "raw",
"format": "json"
}
}
}
}
elif apiname == 'searchFea':
with open(file_path, "rb") as f:
audioBytes = f.read()
body = {
"header": {
"app_id": APPId,
"status": 3
},
"parameter": {
"s782b4996": {
"func": "searchFea",
"groupId": "test_voiceprint_e",
"topK": 1,
"searchFeaRes": {
"encoding": "utf8",
"compress": "raw",
"format": "json"
}
}
},
"payload": {
"resource": {
"encoding": "lame",
"sample_rate": 16000,
"channels": 1,
"bit_depth": 16,
"status": 3,
"audio": str(base64.b64encode(audioBytes), 'UTF-8')
}
}
}
elif apiname == 'searchScoreFea':
with open(file_path, "rb") as f:
audioBytes = f.read()
body = {
"header": {
"app_id": APPId,
"status": 3
},
"parameter": {
"s782b4996": {
"func": "searchScoreFea",
"groupId": "test_voiceprint_e",
"dstFeatureId": dstFeatureId,
"searchScoreFeaRes": {
"encoding": "utf8",
"compress": "raw",
"format": "json"
}
}
},
"payload": {
"resource": {
"encoding": "lame",
"sample_rate": 16000,
"channels": 1,
"bit_depth": 16,
"status": 3,
"audio": str(base64.b64encode(audioBytes), 'UTF-8')
}
}
}
elif apiname == 'updateFeature':
with open(file_path, "rb") as f:
audioBytes = f.read()
body = {
"header": {
"app_id": APPId,
"status": 3
},
"parameter": {
"s782b4996": {
"func": "updateFeature",
"groupId": "iFLYTEK_examples_groupId",
"featureId": "iFLYTEK_examples_featureId",
"featureInfo": "iFLYTEK_examples_featureInfo_update",
"updateFeatureRes": {
"encoding": "utf8",
"compress": "raw",
"format": "json"
}
}
},
"payload": {
"resource": {
"encoding": "lame",
"sample_rate": 16000,
"channels": 1,
"bit_depth": 16,
"status": 3,
"audio": str(base64.b64encode(audioBytes), 'UTF-8')
}
}
}
elif apiname == 'deleteGroup':
body = {
"header": {
"app_id": APPId,
"status": 3
},
"parameter": {
"s782b4996": {
"func": "deleteGroup",
"groupId": "iFLYTEK_examples_groupId",
"deleteGroupRes": {
"encoding": "utf8",
"compress": "raw",
"format": "json"
}
}
}
}
else:
raise Exception(
"输入的apiname不在[createFeature, createGroup, deleteFeature, queryFeatureList, searchFea, searchScoreFea,updateFeature]内,请检查")
return body
log.info(f"开始请求获取到SUT服务URL")
# 获取SUT服务URL
sut_url = get_sut_url()
print(f"获取到的SUT_URL: {sut_url}") # 调试输出
log.info(f"获取到SUT服务URL: {sut_url}")
from urllib.parse import urlparse
# 全局变量
text_decoded = None
###################################新增新增################################
def req_url(api_name, APPId, file_path=None, featureId=None, featureInfo=None, dstFeatureId=None):
"""
开始请求
:param APPId: APPID
:param file_path: body里的文件路径
:return:
"""
global text_decoded
body = gen_req_body(apiname=api_name, APPId=APPId, file_path=file_path, featureId=featureId, featureInfo=featureInfo, dstFeatureId=dstFeatureId)
#request_url = 'https://ai-cloud.4paradigm.com:9443/sid/v1/private/s782b4996'
#request_url = 'https://sut:80/sid/v1/private/s782b4996'
#headers = {'content-type': "application/json", 'host': 'ai-cloud.4paradigm.com', 'appid': APPId}
parsed_url = urlparse(sut_url)
headers = {'content-type': "application/json", 'host': parsed_url.hostname, 'appid': APPId}
# 1. 首先测试服务健康检查
response = requests.get(f"{sut_url}/health")
print(response.status_code, response.text)
# 请求头
headers = {"Content-Type": "application/json"}
# 请求体(可指定限制处理的图片数量)
body = {"limit": 20 } # 可选参数,限制处理的图片总数
# 发送POST请求
response = requests.post(
f"{sut_url}/v1/private/s782b4996",
data=json.dumps(body),
headers=headers
)
# 解析响应结果
if response.status_code == 200:
result = response.json()
print("预测评估结果:")
print(f"准确率: {result['metrics']['accuracy']}%")
print(f"平均召回率: {result['metrics']['average_recall']}%")
print(f"处理图片总数: {result['metrics']['total_images']}")
else:
print(f"请求失败,状态码: {response.status_code}")
print(f"错误信息: {response.text}")
# 添加基本认证信息
auth = ('llm', 'Rmf4#LcG(iFZrjU;2J')
#response = requests.post(request_url, data=json.dumps(body), headers=headers, auth=auth)
#response = requests.post(sut_url + "/predict", data=json.dumps(body), headers=headers, auth=auth)
#response = requests.post(f"{sut_url}/sid/v1/private/s782b4996", data=json.dumps(body), headers=headers, auth=auth)
"""
response = requests.post(f"{sut_url}/v1/private/s782b4996", data=json.dumps(body), headers=headers)
"""
#print("HTTP状态码:", response.status_code)
#print("原始响应内容:", response.text) # 先打印原始内容
#print(f"请求URL: {sut_url + '/v1/private/s782b4996'}")
#print(f"请求headers: {headers}")
#print(f"请求body: {body}")
#tempResult = json.loads(response.content.decode('utf-8'))
#print(tempResult)
"""
# 对text字段进行Base64解码
if 'payload' in tempResult and 'updateFeatureRes' in tempResult['payload']:
text_encoded = tempResult['payload']['updateFeatureRes']['text']
text_decoded = base64.b64decode(text_encoded).decode('utf-8')
print(f"Base64解码后的text字段内容: {text_decoded}")
"""
#text_encoded = tempResult['payload']['updateFeatureRes']['text']
#text_decoded = base64.b64decode(text_encoded).decode('utf-8')
#print(f"Base64解码后的text字段内容: {text_decoded}")
# 获取响应的 JSON 数据
result = response.json()
with open(RESULT_FILEPATH, "w") as f:
json.dump(result, f, indent=4, ensure_ascii=False)
print(f"结果已成功写入 {RESULT_FILEPATH}")
submit_config_filepath = os.getenv("SUBMIT_CONFIG_FILEPATH", "./tests/resources/submit_config")
result_filepath = os.getenv("RESULT_FILEPATH", "./out/result")
bad_cases_filepath = os.getenv("BAD_CASES_FILEPATH", "./out/badcase")
#detail_cases_filepath = os.getenv("DETAILED_CASES_FILEPATH", "./out/detailcase.jsonl")
from typing import Any, Dict, List
def result2file(
result: Dict[str, Any],
detail_cases: List[Dict[str, Any]] = None
):
assert result_filepath is not None
assert bad_cases_filepath is not None
#assert detailed_cases_filepath is not None
if result is not None:
with open(result_filepath, "w") as f:
json.dump(result, f, indent=4, ensure_ascii=False)
#if LOCAL_TEST:
# logger.info(f'result:\n {json.dumps(result, indent=4)}')
"""
if detail_cases is not None:
with open(detailed_cases_filepath, "w") as f:
json.dump(detail_cases, f, indent=4, ensure_ascii=False)
if LOCAL_TEST:
logger.info(f'result:\n {json.dumps(detail_cases, indent=4)}')
"""
def test_image_prediction(sut_url, image_path):
"""发送单张图片到服务端预测"""
url = f"{sut_url}/v1/private/s782b4996"
try:
with open(image_path, 'rb') as f:
files = {'image': f}
response = requests.post(url, files=files, timeout=30)
result = response.json()
if result.get('status') != 'success':
return None, f"服务端错误: {result.get('message')}"
return result, None
except Exception as e:
return None, f"请求错误: {str(e)}"
import random
import time
#from tqdm import tqdm
import os
import requests
if __name__ == '__main__':
print(f"\n===== main开始请求接口 ===============================================")
# 1. 首先测试服务健康检查
print(f"\n===== 服务健康检查 ===================================================")
response = requests.get(f"{sut_url}/health")
print(response.status_code, response.text)
###############################################################################################
dataset_root = "/tmp/workspace/256ObjectCategoriesNew" # 数据集根目录
samples_per_class = 3 # 每个类别抽取的样本数
image_extensions = ('.jpg', '.jpeg', '.png', '.bmp', '.gif') # 支持的图片格式
# 结果统计变量
total_samples = 0
#correct_predictions = 0
# GPU统计
gpu_true_positives = 0
gpu_false_positives = 0
gpu_false_negatives = 0
gpu_total_processing_time = 0.0
# CPU统计
cpu_true_positives = 0
cpu_false_positives = 0
cpu_false_negatives = 0
cpu_total_processing_time = 0.0
# 遍历所有类别文件夹
for folder_name in os.listdir(dataset_root):
folder_path = os.path.join(dataset_root, folder_name)
# 跳过非文件夹的项目
if not os.path.isdir(folder_path):
continue
# 提取类别名(从"序号.name"格式中提取name部分
try:
class_name = folder_name.split('.', 1)[1].strip().lower()
except IndexError:
print(f"警告:文件夹 {folder_name} 命名格式不正确,跳过该文件夹")
continue
# 获取文件夹中所有图片
image_files = []
for file in os.listdir(folder_path):
file_path = os.path.join(folder_path, file)
if os.path.isfile(file_path) and file.lower().endswith(image_extensions):
image_files.append(file_path)
# 随机抽取指定数量的图片(如果不足则取全部)
selected_images = random.sample(
image_files,
min(samples_per_class, len(image_files))
)
for img_path in selected_images:
total_samples += 1
# 获取预测结果
prediction, error = test_image_prediction(sut_url, img_path)
# 打印test_image_prediction返回的结果
print(f"test_image_prediction返回的prediction: {prediction}")
print(f"test_image_prediction返回的error: {error}")
if error:
print(f"处理图片 {img_path} 失败: {error}")
continue
# 解析GPU预测结果
gpu_pred = prediction.get('cuda_prediction', {})
gpu_pred_class = gpu_pred.get('class_name', '').lower()
gpu_processing_time = gpu_pred.get('processing_time', 0.0)
# 解析CPU预测结果
cpu_pred = prediction.get('cpu_prediction', {})
cpu_pred_class = cpu_pred.get('class_name', '').lower()
cpu_processing_time = cpu_pred.get('processing_time', 0.0)
# 判断GPU预测是否正确
gpu_is_correct = class_name in gpu_pred_class
if gpu_is_correct:
gpu_true_positives += 1
else:
gpu_false_positives += 1
gpu_false_negatives += 1
# 判断CPU预测是否正确
cpu_is_correct = class_name in cpu_pred_class
if cpu_is_correct:
cpu_true_positives += 1
else:
cpu_false_positives += 1
cpu_false_negatives += 1
# 累加处理时间
gpu_total_processing_time += gpu_processing_time
cpu_total_processing_time += cpu_processing_time
# 打印详细结果
print(f"图片: {os.path.basename(img_path)} | 真实: {class_name}")
print(f"GPU预测: {gpu_pred_class} | {'正确' if gpu_is_correct else '错误'} | 耗时: {gpu_processing_time:.6f}s")
print(f"CPU预测: {cpu_pred_class} | {'正确' if cpu_is_correct else '错误'} | 耗时: {cpu_processing_time:.6f}s")
print("-" * 50)
# 初始化结果字典
result = {
# GPU指标
"gpu_accuracy": 0.0,
"gpu_recall": 0.0,
"gpu_running_time": round(gpu_total_processing_time, 6),
"gpu_throughput": 0.0,
# CPU指标
"cpu_accuracy": 0.0,
"cpu_recall": 0.0,
"cpu_running_time": round(cpu_total_processing_time, 6),
"cpu_throughput": 0.0
}
# 计算GPU指标
gpu_accuracy = gpu_true_positives / total_samples * 100
gpu_recall_denominator = gpu_true_positives + gpu_false_negatives
gpu_recall = gpu_true_positives / gpu_recall_denominator * 100 if gpu_recall_denominator > 0 else 0
gpu_throughput = total_samples / gpu_total_processing_time if gpu_total_processing_time > 1e-6 else 0
# 计算CPU指标
cpu_accuracy = cpu_true_positives / total_samples * 100
cpu_recall_denominator = cpu_true_positives + cpu_false_negatives
cpu_recall = cpu_true_positives / cpu_recall_denominator * 100 if cpu_recall_denominator > 0 else 0
cpu_throughput = total_samples / cpu_total_processing_time if cpu_total_processing_time > 1e-6 else 0
# 更新结果字典
result.update({
"gpu_accuracy": round(gpu_accuracy, 6),
"gpu_recall": round(gpu_recall, 6),
"gpu_throughput": round(gpu_throughput, 6),
"cpu_accuracy": round(cpu_accuracy, 6),
"cpu_recall": round(cpu_recall, 6),
"cpu_throughput": round(cpu_throughput, 6)
})
# 打印最终统计结果
print("\n" + "="*50)
print(f"总样本数: {total_samples}")
print("\nGPU指标:")
print(f"准确率: {result['gpu_accuracy']:.4f}%")
print(f"召回率: {result['gpu_recall']:.4f}%")
print(f"总运行时间: {result['gpu_running_time']:.6f}s")
print(f"吞吐量: {result['gpu_throughput']:.2f}张/秒")
print("\nCPU指标:")
print(f"准确率: {result['cpu_accuracy']:.4f}%")
print(f"召回率: {result['cpu_recall']:.4f}%")
print(f"总运行时间: {result['cpu_running_time']:.6f}s")
print(f"吞吐量: {result['cpu_throughput']:.2f}张/秒")
print("="*50)
#result = {}
#result['accuracy_1_1'] = 3
result2file(result)
if abs(gpu_accuracy - cpu_accuracy) > 3:
log.error(f"gpu与cpu准确率差别超过3%,模型结果不正确")
change_product_unavailable()
exit_code = 0