push to main
This commit is contained in:
12
Dockerfile_bi100
Normal file
12
Dockerfile_bi100
Normal file
@@ -0,0 +1,12 @@
|
||||
|
||||
FROM zibo.harbor.iluvatar.com.cn:30000/saas/bi100-3.2.1-x86-ubuntu20.04-py3.10-poc-llm-infer:v1.2.2
|
||||
|
||||
WORKDIR /workspace/
|
||||
COPY ./model_test_caltech_http.py /workspace/
|
||||
COPY ./microsoft_beit_base_patch16_224_pt22k_ft22k /model
|
||||
|
||||
RUN ln -s $(which python3) /usr/bin/python
|
||||
|
||||
CMD ["python3", "model_test_caltech_http.py"]
|
||||
|
||||
|
||||
@@ -1,215 +0,0 @@
|
||||
import requests
|
||||
import json
|
||||
import torch
|
||||
from PIL import Image
|
||||
from io import BytesIO
|
||||
from transformers import BeitImageProcessor, BeitForImageClassification
|
||||
# 根据模型实际架构选择类
|
||||
from transformers import ViTForImageClassification, BeitForImageClassification
|
||||
from tqdm import tqdm
|
||||
from transformers import AutoConfig
|
||||
from transformers import AutoImageProcessor, AutoModelForImageClassification
|
||||
import os
|
||||
import random
|
||||
import time # 新增导入时间模块
|
||||
|
||||
# 支持 Iluvatar GPU 加速,若不可用则使用 CPU
|
||||
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
|
||||
print(f"当前使用的设备: {device}") # 添加调试信息
|
||||
|
||||
# 若有多块 GPU,可使用 DataParallel 进行并行计算
|
||||
if torch.cuda.device_count() > 1:
|
||||
print(f"使用 {torch.cuda.device_count()} 块 GPU 进行计算")
|
||||
|
||||
class COCOImageClassifier:
|
||||
def __init__(self, model_path: str, local_image_paths: list):
|
||||
"""初始化COCO图像分类器"""
|
||||
self.processor = AutoImageProcessor.from_pretrained(model_path)
|
||||
self.model = AutoModelForImageClassification.from_pretrained(model_path)
|
||||
|
||||
# 将模型移动到设备
|
||||
self.model = self.model.to(device)
|
||||
print(f"模型是否在 GPU 上: {next(self.model.parameters()).is_cuda}") # 添加调试信息
|
||||
|
||||
# 若有多块 GPU,使用 DataParallel
|
||||
if torch.cuda.device_count() > 1:
|
||||
self.model = torch.nn.DataParallel(self.model)
|
||||
|
||||
self.id2label = self.model.module.config.id2label if hasattr(self.model, 'module') else self.model.config.id2label
|
||||
self.local_image_paths = local_image_paths
|
||||
|
||||
def predict_image_path(self, image_path: str, top_k: int = 5) -> dict:
|
||||
"""
|
||||
预测本地图片文件对应的图片类别
|
||||
|
||||
Args:
|
||||
image_path: 本地图片文件路径
|
||||
top_k: 返回置信度最高的前k个类别
|
||||
|
||||
Returns:
|
||||
包含预测结果的字典
|
||||
"""
|
||||
try:
|
||||
# 打开图片
|
||||
image = Image.open(image_path).convert("RGB")
|
||||
|
||||
# 预处理
|
||||
inputs = self.processor(images=image, return_tensors="pt")
|
||||
|
||||
# 将输入数据移动到设备
|
||||
inputs = inputs.to(device)
|
||||
|
||||
# 模型推理
|
||||
with torch.no_grad():
|
||||
outputs = self.model(**inputs)
|
||||
|
||||
# 获取预测结果
|
||||
logits = outputs.logits
|
||||
probs = torch.nn.functional.softmax(logits, dim=1)
|
||||
top_probs, top_indices = probs.topk(top_k, dim=1)
|
||||
|
||||
# 整理结果
|
||||
predictions = []
|
||||
for i in range(top_k):
|
||||
class_idx = top_indices[0, i].item()
|
||||
confidence = top_probs[0, i].item()
|
||||
predictions.append({
|
||||
"class_id": class_idx,
|
||||
"class_name": self.id2label[class_idx],
|
||||
"confidence": confidence
|
||||
})
|
||||
|
||||
return {
|
||||
"image_path": image_path,
|
||||
"predictions": predictions
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
print(f"处理图片文件 {image_path} 时出错: {e}")
|
||||
return None
|
||||
|
||||
def batch_predict(self, limit: int = 20, top_k: int = 5) -> list:
|
||||
"""
|
||||
批量预测本地图片
|
||||
|
||||
Args:
|
||||
limit: 限制处理的图片数量
|
||||
top_k: 返回置信度最高的前k个类别
|
||||
|
||||
Returns:
|
||||
包含所有预测结果的列表
|
||||
"""
|
||||
results = []
|
||||
local_image_paths = self.local_image_paths[:limit]
|
||||
|
||||
print(f"开始预测 {len(local_image_paths)} 张本地图片...")
|
||||
start_time = time.time() # 记录开始时间
|
||||
for image_path in tqdm(local_image_paths):
|
||||
result = self.predict_image_path(image_path, top_k)
|
||||
if result:
|
||||
results.append(result)
|
||||
end_time = time.time() # 记录结束时间
|
||||
total_time = end_time - start_time # 计算总时间
|
||||
images_per_second = len(results) / total_time # 计算每秒处理的图片数量
|
||||
print(f"模型每秒可以处理 {images_per_second:.2f} 张图片")
|
||||
return results
|
||||
|
||||
def save_results(self, results: list, output_file: str = "caltech_predictions.json"):
|
||||
"""
|
||||
保存预测结果到JSON文件
|
||||
|
||||
Args:
|
||||
results: 预测结果列表
|
||||
output_file: 输出文件名
|
||||
"""
|
||||
with open(output_file, 'w', encoding='utf-8') as f:
|
||||
json.dump(results, f, ensure_ascii=False, indent=2)
|
||||
|
||||
print(f"结果已保存到 {output_file}")
|
||||
|
||||
# 主程序
|
||||
if __name__ == "__main__":
|
||||
# 替换为本地模型路径
|
||||
LOCAL_MODEL_PATH = "/home/zhoushasha/models/microsoft_beit_base_patch16_224_pt22k_ft22k"
|
||||
|
||||
# 替换为Caltech 256数据集文件夹路径
|
||||
CALTECH_256_PATH = "/home/zhoushasha/models/256ObjectCategoriesNew"
|
||||
|
||||
local_image_paths = []
|
||||
true_labels = {}
|
||||
|
||||
# 遍历Caltech 256数据集中的每个文件夹
|
||||
for folder in os.listdir(CALTECH_256_PATH):
|
||||
folder_path = os.path.join(CALTECH_256_PATH, folder)
|
||||
if os.path.isdir(folder_path):
|
||||
# 获取文件夹名称中的类别名称
|
||||
class_name = folder.split('.', 1)[1]
|
||||
# 获取文件夹中的所有图片文件
|
||||
image_files = [os.path.join(folder_path, f) for f in os.listdir(folder_path) if f.endswith(('.jpg', '.jpeg', '.png'))]
|
||||
# 随机选择3张图片
|
||||
selected_images = random.sample(image_files, min(3, len(image_files)))
|
||||
for image_path in selected_images:
|
||||
local_image_paths.append(image_path)
|
||||
true_labels[image_path] = class_name
|
||||
|
||||
# 创建分类器实例
|
||||
classifier = COCOImageClassifier(LOCAL_MODEL_PATH, local_image_paths)
|
||||
|
||||
# 批量预测
|
||||
results = classifier.batch_predict(limit=len(local_image_paths), top_k=3)
|
||||
|
||||
# 保存结果
|
||||
classifier.save_results(results)
|
||||
|
||||
# 打印简要统计
|
||||
print(f"\n处理完成: 成功预测 {len(results)} 张图片")
|
||||
if results:
|
||||
print("\n示例预测结果:")
|
||||
sample = results[0]
|
||||
print(f"图片路径: {sample['image_path']}")
|
||||
for i, pred in enumerate(sample['predictions'], 1):
|
||||
print(f"{i}. {pred['class_name']} (置信度: {pred['confidence']:.2%})")
|
||||
|
||||
correct_count = 0
|
||||
total_count = len(results)
|
||||
|
||||
# 统计每个类别的实际样本数和正确预测数
|
||||
class_actual_count = {}
|
||||
class_correct_count = {}
|
||||
|
||||
for prediction in results:
|
||||
image_path = prediction['image_path']
|
||||
top1_prediction = max(prediction['predictions'], key=lambda x: x['confidence'])
|
||||
predicted_class = top1_prediction['class_name'].lower()
|
||||
true_class = true_labels.get(image_path).lower()
|
||||
|
||||
# 统计每个类别的实际样本数
|
||||
if true_class not in class_actual_count:
|
||||
class_actual_count[true_class] = 0
|
||||
class_actual_count[true_class] += 1
|
||||
|
||||
# 检查预测类别中的每个单词是否包含真实标签
|
||||
words = predicted_class.split()
|
||||
for word in words:
|
||||
if true_class in word:
|
||||
correct_count += 1
|
||||
# 统计每个类别的正确预测数
|
||||
if true_class not in class_correct_count:
|
||||
class_correct_count[true_class] = 0
|
||||
class_correct_count[true_class] += 1
|
||||
break
|
||||
|
||||
accuracy = correct_count / total_count
|
||||
print(f"\nAccuracy: {accuracy * 100:.2f}%")
|
||||
|
||||
# 计算每个类别的召回率
|
||||
recall_per_class = {}
|
||||
for class_name in class_actual_count:
|
||||
if class_name in class_correct_count:
|
||||
recall_per_class[class_name] = class_correct_count[class_name] / class_actual_count[class_name]
|
||||
else:
|
||||
recall_per_class[class_name] = 0
|
||||
|
||||
# 计算平均召回率
|
||||
average_recall = sum(recall_per_class.values()) / len(recall_per_class)
|
||||
print(f"\nAverage Recall: {average_recall * 100:.2f}%")
|
||||
@@ -1,197 +0,0 @@
|
||||
import requests
|
||||
import json
|
||||
import torch
|
||||
from PIL import Image
|
||||
from io import BytesIO
|
||||
from transformers import AutoImageProcessor, AutoModelForImageClassification
|
||||
from tqdm import tqdm
|
||||
import os
|
||||
import random
|
||||
import time
|
||||
|
||||
# 强制使用CPU
|
||||
device = torch.device("cpu")
|
||||
print(f"当前使用的设备: {device}")
|
||||
|
||||
class COCOImageClassifier:
|
||||
def __init__(self, model_path: str, local_image_paths: list):
|
||||
"""初始化COCO图像分类器"""
|
||||
self.processor = AutoImageProcessor.from_pretrained(model_path)
|
||||
self.model = AutoModelForImageClassification.from_pretrained(model_path)
|
||||
|
||||
# 将模型移动到CPU
|
||||
self.model = self.model.to(device)
|
||||
self.id2label = self.model.config.id2label
|
||||
self.local_image_paths = local_image_paths
|
||||
|
||||
def predict_image_path(self, image_path: str, top_k: int = 5) -> dict:
|
||||
"""
|
||||
预测本地图片文件对应的图片类别
|
||||
|
||||
Args:
|
||||
image_path: 本地图片文件路径
|
||||
top_k: 返回置信度最高的前k个类别
|
||||
|
||||
Returns:
|
||||
包含预测结果的字典
|
||||
"""
|
||||
try:
|
||||
# 打开图片
|
||||
image = Image.open(image_path).convert("RGB")
|
||||
|
||||
# 预处理
|
||||
inputs = self.processor(images=image, return_tensors="pt")
|
||||
|
||||
# 将输入数据移动到CPU
|
||||
inputs = inputs.to(device)
|
||||
|
||||
# 模型推理
|
||||
with torch.no_grad():
|
||||
outputs = self.model(**inputs)
|
||||
|
||||
# 获取预测结果
|
||||
logits = outputs.logits
|
||||
probs = torch.nn.functional.softmax(logits, dim=1)
|
||||
top_probs, top_indices = probs.topk(top_k, dim=1)
|
||||
|
||||
# 整理结果
|
||||
predictions = []
|
||||
for i in range(top_k):
|
||||
class_idx = top_indices[0, i].item()
|
||||
confidence = top_probs[0, i].item()
|
||||
predictions.append({
|
||||
"class_id": class_idx,
|
||||
"class_name": self.id2label[class_idx],
|
||||
"confidence": confidence
|
||||
})
|
||||
|
||||
return {
|
||||
"image_path": image_path,
|
||||
"predictions": predictions
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
print(f"处理图片文件 {image_path} 时出错: {e}")
|
||||
return None
|
||||
|
||||
def batch_predict(self, limit: int = 20, top_k: int = 5) -> list:
|
||||
"""
|
||||
批量预测本地图片
|
||||
|
||||
Args:
|
||||
limit: 限制处理的图片数量
|
||||
top_k: 返回置信度最高的前k个类别
|
||||
|
||||
Returns:
|
||||
包含所有预测结果的列表
|
||||
"""
|
||||
results = []
|
||||
local_image_paths = self.local_image_paths[:limit]
|
||||
|
||||
print(f"开始预测 {len(local_image_paths)} 张本地图片...")
|
||||
start_time = time.time()
|
||||
for image_path in tqdm(local_image_paths):
|
||||
result = self.predict_image_path(image_path, top_k)
|
||||
if result:
|
||||
results.append(result)
|
||||
end_time = time.time()
|
||||
|
||||
# 计算吞吐量
|
||||
throughput = len(results) / (end_time - start_time)
|
||||
print(f"模型每秒可以处理 {throughput:.2f} 张图片")
|
||||
|
||||
return results
|
||||
|
||||
def save_results(self, results: list, output_file: str = "celtech_cpu_predictions.json"):
|
||||
"""
|
||||
保存预测结果到JSON文件
|
||||
|
||||
Args:
|
||||
results: 预测结果列表
|
||||
output_file: 输出文件名
|
||||
"""
|
||||
with open(output_file, 'w', encoding='utf-8') as f:
|
||||
json.dump(results, f, ensure_ascii=False, indent=2)
|
||||
|
||||
print(f"结果已保存到 {output_file}")
|
||||
|
||||
# 主程序
|
||||
if __name__ == "__main__":
|
||||
# 替换为本地模型路径
|
||||
LOCAL_MODEL_PATH = "/home/zhoushasha/models/microsoft_beit_base_patch16_224_pt22k_ft22k"
|
||||
|
||||
# 替换为Caltech 256数据集文件夹路径 New
|
||||
CALTECH_256_PATH = "/home/zhoushasha/models/256ObjectCategoriesNew"
|
||||
|
||||
local_image_paths = []
|
||||
true_labels = {}
|
||||
|
||||
# 遍历Caltech 256数据集中的每个文件夹
|
||||
for folder in os.listdir(CALTECH_256_PATH):
|
||||
folder_path = os.path.join(CALTECH_256_PATH, folder)
|
||||
if os.path.isdir(folder_path):
|
||||
# 获取文件夹名称中的类别名称
|
||||
class_name = folder.split('.', 1)[1]
|
||||
# 获取文件夹中的所有图片文件
|
||||
image_files = [os.path.join(folder_path, f) for f in os.listdir(folder_path) if f.endswith(('.jpg', '.jpeg', '.png'))]
|
||||
# 随机选择3张图片
|
||||
selected_images = random.sample(image_files, min(3, len(image_files)))
|
||||
for image_path in selected_images:
|
||||
local_image_paths.append(image_path)
|
||||
true_labels[image_path] = class_name
|
||||
|
||||
# 创建分类器实例
|
||||
classifier = COCOImageClassifier(LOCAL_MODEL_PATH, local_image_paths)
|
||||
|
||||
# 批量预测
|
||||
results = classifier.batch_predict(limit=len(local_image_paths), top_k=3)
|
||||
|
||||
# 保存结果
|
||||
classifier.save_results(results)
|
||||
|
||||
# 打印简要统计
|
||||
print(f"\n处理完成: 成功预测 {len(results)} 张图片")
|
||||
if results:
|
||||
print("\n示例预测结果:")
|
||||
sample = results[0]
|
||||
print(f"图片路径: {sample['image_path']}")
|
||||
for i, pred in enumerate(sample['predictions'], 1):
|
||||
print(f"{i}. {pred['class_name']} (置信度: {pred['confidence']:.2%})")
|
||||
|
||||
correct_count = 0
|
||||
total_count = len(results)
|
||||
class_true_positives = {}
|
||||
class_false_negatives = {}
|
||||
|
||||
for prediction in results:
|
||||
image_path = prediction['image_path']
|
||||
top1_prediction = max(prediction['predictions'], key=lambda x: x['confidence'])
|
||||
predicted_class = top1_prediction['class_name'].lower()
|
||||
true_class = true_labels.get(image_path).lower()
|
||||
|
||||
if true_class not in class_true_positives:
|
||||
class_true_positives[true_class] = 0
|
||||
class_false_negatives[true_class] = 0
|
||||
|
||||
# 检查预测类别中的每个单词是否包含真实标签
|
||||
words = predicted_class.split()
|
||||
for word in words:
|
||||
if true_class in word:
|
||||
correct_count += 1
|
||||
class_true_positives[true_class] += 1
|
||||
break
|
||||
else:
|
||||
class_false_negatives[true_class] += 1
|
||||
|
||||
accuracy = correct_count / total_count
|
||||
print(f"\nAccuracy: {accuracy * 100:.2f}%")
|
||||
|
||||
# 计算召回率
|
||||
total_true_positives = 0
|
||||
total_false_negatives = 0
|
||||
for class_name in class_true_positives:
|
||||
total_true_positives += class_true_positives[class_name]
|
||||
total_false_negatives += class_false_negatives[class_name]
|
||||
|
||||
recall = total_true_positives / (total_true_positives + total_false_negatives)
|
||||
print(f"Recall: {recall * 100:.2f}%")
|
||||
@@ -1,163 +0,0 @@
|
||||
import requests
|
||||
import json
|
||||
import torch
|
||||
from PIL import Image
|
||||
from io import BytesIO
|
||||
from transformers import AutoImageProcessor, AutoModelForImageClassification
|
||||
from tqdm import tqdm
|
||||
import os
|
||||
import random
|
||||
import time
|
||||
from flask import Flask, request, jsonify # 引入Flask
|
||||
|
||||
# 设备配置
|
||||
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
|
||||
print(f"当前使用的设备: {device}")
|
||||
|
||||
class COCOImageClassifier:
|
||||
def __init__(self, model_path: str):
|
||||
"""初始化分类器(移除local_image_paths参数,改为动态接收)"""
|
||||
self.processor = AutoImageProcessor.from_pretrained(model_path)
|
||||
self.model = AutoModelForImageClassification.from_pretrained(model_path)
|
||||
self.model = self.model.to(device)
|
||||
|
||||
if torch.cuda.device_count() > 1:
|
||||
print(f"使用 {torch.cuda.device_count()} 块GPU")
|
||||
self.model = torch.nn.DataParallel(self.model)
|
||||
|
||||
self.id2label = self.model.module.config.id2label if hasattr(self.model, 'module') else self.model.config.id2label
|
||||
|
||||
def predict_image_path(self, image_path: str, top_k: int = 5) -> dict:
|
||||
"""预测单张图片(复用原逻辑)"""
|
||||
try:
|
||||
image = Image.open(image_path).convert("RGB")
|
||||
inputs = self.processor(images=image, return_tensors="pt").to(device)
|
||||
|
||||
with torch.no_grad():
|
||||
outputs = self.model(** inputs)
|
||||
|
||||
logits = outputs.logits
|
||||
probs = torch.nn.functional.softmax(logits, dim=1)
|
||||
top_probs, top_indices = probs.topk(top_k, dim=1)
|
||||
|
||||
predictions = []
|
||||
for i in range(top_k):
|
||||
class_idx = top_indices[0, i].item()
|
||||
predictions.append({
|
||||
"class_id": class_idx,
|
||||
"class_name": self.id2label[class_idx],
|
||||
"confidence": top_probs[0, i].item()
|
||||
})
|
||||
|
||||
return {
|
||||
"image_path": image_path,
|
||||
"predictions": predictions
|
||||
}
|
||||
except Exception as e:
|
||||
print(f"处理图片 {image_path} 出错: {e}")
|
||||
return None
|
||||
|
||||
def batch_predict_and_evaluate(self, image_paths: list, true_labels: dict, top_k: int = 3) -> dict:
|
||||
"""批量预测并计算准确率、召回率"""
|
||||
results = []
|
||||
start_time = time.time()
|
||||
|
||||
for image_path in tqdm(image_paths):
|
||||
result = self.predict_image_path(image_path, top_k)
|
||||
if result:
|
||||
results.append(result)
|
||||
|
||||
end_time = time.time()
|
||||
total_time = end_time - start_time
|
||||
images_per_second = len(results) / total_time if total_time > 0 else 0
|
||||
|
||||
# 计算准确率和召回率(复用原逻辑)
|
||||
correct_count = 0
|
||||
total_count = len(results)
|
||||
class_actual_count = {}
|
||||
class_correct_count = {}
|
||||
|
||||
for prediction in results:
|
||||
image_path = prediction['image_path']
|
||||
top1_prediction = max(prediction['predictions'], key=lambda x: x['confidence'])
|
||||
predicted_class = top1_prediction['class_name'].lower()
|
||||
true_class = true_labels.get(image_path, "").lower()
|
||||
|
||||
# 统计每个类别的实际样本数
|
||||
class_actual_count[true_class] = class_actual_count.get(true_class, 0) + 1
|
||||
|
||||
# 检查预测是否正确
|
||||
words = predicted_class.split()
|
||||
for word in words:
|
||||
if true_class in word:
|
||||
correct_count += 1
|
||||
class_correct_count[true_class] = class_correct_count.get(true_class, 0) + 1
|
||||
break
|
||||
|
||||
# 计算指标
|
||||
accuracy = correct_count / total_count if total_count > 0 else 0
|
||||
recall_per_class = {}
|
||||
for class_name in class_actual_count:
|
||||
recall_per_class[class_name] = class_correct_count.get(class_name, 0) / class_actual_count[class_name]
|
||||
|
||||
average_recall = sum(recall_per_class.values()) / len(recall_per_class) if recall_per_class else 0
|
||||
|
||||
# 返回包含指标的结果
|
||||
return {
|
||||
"status": "success",
|
||||
"metrics": {
|
||||
"accuracy": round(accuracy * 100, 2), # 百分比
|
||||
"average_recall": round(average_recall * 100, 2), # 百分比
|
||||
"total_images": total_count,
|
||||
"correct_predictions": correct_count,
|
||||
"speed_images_per_second": round(images_per_second, 2)
|
||||
},
|
||||
"sample_predictions": results[:3] # 示例预测结果(可选)
|
||||
}
|
||||
|
||||
# 初始化Flask服务
|
||||
app = Flask(__name__)
|
||||
MODEL_PATH = os.environ.get("MODEL_PATH", "/model") # 容器内模型路径
|
||||
DATASET_PATH = os.environ.get("DATASET_PATH", "/app/dataset") # 容器内数据集路径
|
||||
classifier = COCOImageClassifier(MODEL_PATH)
|
||||
|
||||
@app.route('/v1/private/s782b4996', methods=['POST'])
|
||||
def evaluate():
|
||||
"""接收请求并返回评估结果(准确率、召回率等)"""
|
||||
try:
|
||||
# 解析请求参数(可选:允许动态指定limit等参数)
|
||||
data = request.get_json()
|
||||
limit = data.get("limit", 20) # 限制处理的图片数量
|
||||
|
||||
# 加载数据集(容器内路径)
|
||||
local_image_paths = []
|
||||
true_labels = {}
|
||||
for folder in os.listdir(DATASET_PATH):
|
||||
folder_path = os.path.join(DATASET_PATH, folder)
|
||||
if os.path.isdir(folder_path):
|
||||
class_name = folder.split('.', 1)[1]
|
||||
image_files = [os.path.join(folder_path, f) for f in os.listdir(folder_path) if f.endswith(('.jpg', '.jpeg', '.png'))]
|
||||
selected_images = random.sample(image_files, min(3, len(image_files)))
|
||||
for image_path in selected_images:
|
||||
local_image_paths.append(image_path)
|
||||
true_labels[image_path] = class_name
|
||||
|
||||
# 限制处理数量
|
||||
local_image_paths = local_image_paths[:limit]
|
||||
|
||||
# 执行预测和评估
|
||||
result = classifier.batch_predict_and_evaluate(local_image_paths, true_labels, top_k=3)
|
||||
return jsonify(result)
|
||||
|
||||
except Exception as e:
|
||||
return jsonify({
|
||||
"status": "error",
|
||||
"message": str(e)
|
||||
}), 500
|
||||
|
||||
@app.route('/health', methods=['GET'])
|
||||
def health_check():
|
||||
return jsonify({"status": "healthy", "device": str(device)}), 200
|
||||
|
||||
if __name__ == "__main__":
|
||||
app.run(host='0.0.0.0', port=8000, debug=False)
|
||||
@@ -1,89 +0,0 @@
|
||||
import torch
|
||||
from PIL import Image
|
||||
from transformers import AutoImageProcessor, AutoModelForImageClassification
|
||||
import os
|
||||
from flask import Flask, request, jsonify
|
||||
from io import BytesIO
|
||||
|
||||
# 设备配置
|
||||
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
|
||||
print(f"当前使用的设备: {device}")
|
||||
|
||||
class ImageClassifier:
|
||||
def __init__(self, model_path: str):
|
||||
# 获取模型路径下的第一个子目录(假设模型文件存放在这里)
|
||||
subdirs = [d for d in os.listdir(model_path) if os.path.isdir(os.path.join(model_path, d))]
|
||||
if not subdirs:
|
||||
raise ValueError(f"在 {model_path} 下未找到任何子目录,无法加载模型")
|
||||
|
||||
# 实际的模型文件路径
|
||||
actual_model_path = os.path.join(model_path, subdirs[0])
|
||||
print(f"加载模型从: {actual_model_path}")
|
||||
|
||||
self.processor = AutoImageProcessor.from_pretrained(actual_model_path)
|
||||
self.model = AutoModelForImageClassification.from_pretrained(actual_model_path)
|
||||
self.model = self.model.to(device)
|
||||
|
||||
if torch.cuda.device_count() > 1:
|
||||
print(f"使用 {torch.cuda.device_count()} 块GPU")
|
||||
self.model = torch.nn.DataParallel(self.model)
|
||||
|
||||
self.id2label = self.model.module.config.id2label if hasattr(self.model, 'module') else self.model.config.id2label
|
||||
|
||||
def predict_single_image(self, image) -> dict:
|
||||
"""预测单张图片,返回置信度最高的结果"""
|
||||
try:
|
||||
# 处理图片
|
||||
inputs = self.processor(images=image, return_tensors="pt").to(device)
|
||||
|
||||
with torch.no_grad():
|
||||
outputs = self.model(** inputs)
|
||||
|
||||
logits = outputs.logits
|
||||
probs = torch.nn.functional.softmax(logits, dim=1)
|
||||
# 获取置信度最高的预测结果
|
||||
max_prob, max_idx = probs.max(dim=1)
|
||||
class_idx = max_idx.item()
|
||||
|
||||
return {
|
||||
"status": "success",
|
||||
"top_prediction": {
|
||||
"class_id": class_idx,
|
||||
"class_name": self.id2label[class_idx],
|
||||
"confidence": max_prob.item()
|
||||
}
|
||||
}
|
||||
except Exception as e:
|
||||
return {
|
||||
"status": "error",
|
||||
"message": str(e)
|
||||
}
|
||||
|
||||
# 初始化服务
|
||||
app = Flask(__name__)
|
||||
MODEL_PATH = os.environ.get("MODEL_PATH", "/model") # 模型根路径(环境变量或默认路径)
|
||||
classifier = ImageClassifier(MODEL_PATH)
|
||||
|
||||
@app.route('/v1/private/s782b4996', methods=['POST'])
|
||||
def predict_single():
|
||||
"""接收单张图片并返回最高置信度预测结果"""
|
||||
# 检查是否有图片上传
|
||||
if 'image' not in request.files:
|
||||
return jsonify({"status": "error", "message": "请求中未包含图片"}), 400
|
||||
|
||||
image_file = request.files['image']
|
||||
try:
|
||||
# 读取图片
|
||||
image = Image.open(BytesIO(image_file.read())).convert("RGB")
|
||||
# 预测
|
||||
result = classifier.predict_single_image(image)
|
||||
return jsonify(result)
|
||||
except Exception as e:
|
||||
return jsonify({"status": "error", "message": str(e)}), 500
|
||||
|
||||
@app.route('/health', methods=['GET'])
|
||||
def health_check():
|
||||
return jsonify({"status": "healthy", "device": str(device)}), 200
|
||||
|
||||
if __name__ == "__main__":
|
||||
app.run(host='0.0.0.0', port=8000, debug=False)
|
||||
@@ -1,80 +0,0 @@
|
||||
import torch
|
||||
from PIL import Image
|
||||
from transformers import AutoImageProcessor, AutoModelForImageClassification
|
||||
import os
|
||||
from flask import Flask, request, jsonify
|
||||
from io import BytesIO
|
||||
|
||||
# 设备配置
|
||||
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
|
||||
print(f"当前使用的设备: {device}")
|
||||
|
||||
class ImageClassifier:
|
||||
def __init__(self, model_path: str):
|
||||
self.processor = AutoImageProcessor.from_pretrained(model_path)
|
||||
self.model = AutoModelForImageClassification.from_pretrained(model_path)
|
||||
self.model = self.model.to(device)
|
||||
|
||||
if torch.cuda.device_count() > 1:
|
||||
print(f"使用 {torch.cuda.device_count()} 块GPU")
|
||||
self.model = torch.nn.DataParallel(self.model)
|
||||
|
||||
self.id2label = self.model.module.config.id2label if hasattr(self.model, 'module') else self.model.config.id2label
|
||||
|
||||
def predict_single_image(self, image) -> dict:
|
||||
"""预测单张图片,返回置信度最高的结果"""
|
||||
try:
|
||||
# 处理图片
|
||||
inputs = self.processor(images=image, return_tensors="pt").to(device)
|
||||
|
||||
with torch.no_grad():
|
||||
outputs = self.model(**inputs)
|
||||
|
||||
logits = outputs.logits
|
||||
probs = torch.nn.functional.softmax(logits, dim=1)
|
||||
# 获取置信度最高的预测结果
|
||||
max_prob, max_idx = probs.max(dim=1)
|
||||
class_idx = max_idx.item()
|
||||
|
||||
return {
|
||||
"status": "success",
|
||||
"top_prediction": {
|
||||
"class_id": class_idx,
|
||||
"class_name": self.id2label[class_idx],
|
||||
"confidence": max_prob.item()
|
||||
}
|
||||
}
|
||||
except Exception as e:
|
||||
return {
|
||||
"status": "error",
|
||||
"message": str(e)
|
||||
}
|
||||
|
||||
# 初始化服务
|
||||
app = Flask(__name__)
|
||||
MODEL_PATH = os.environ.get("MODEL_PATH", "/model") # 模型路径(环境变量或默认路径)
|
||||
classifier = ImageClassifier(MODEL_PATH)
|
||||
|
||||
@app.route('/v1/private/s782b4996', methods=['POST'])
|
||||
def predict_single():
|
||||
"""接收单张图片并返回最高置信度预测结果"""
|
||||
# 检查是否有图片上传
|
||||
if 'image' not in request.files:
|
||||
return jsonify({"status": "error", "message": "请求中未包含图片"}), 400
|
||||
|
||||
image_file = request.files['image']
|
||||
try:
|
||||
# 读取图片
|
||||
image = Image.open(BytesIO(image_file.read())).convert("RGB")
|
||||
# 预测
|
||||
result = classifier.predict_single_image(image)
|
||||
return jsonify(result)
|
||||
except Exception as e:
|
||||
return jsonify({"status": "error", "message": str(e)}), 500
|
||||
|
||||
@app.route('/health', methods=['GET'])
|
||||
def health_check():
|
||||
return jsonify({"status": "healthy", "device": str(device)}), 200
|
||||
|
||||
if __name__ == "__main__":
|
||||
app.run(host='0.0.0.0', port=80, debug=False)
|
||||
1193
run_callback_cuda.py
1193
run_callback_cuda.py
File diff suppressed because it is too large
Load Diff
1296
run_callback_new.py
1296
run_callback_new.py
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user