Files
sglang/examples/usage/openai_batch_complete.py
2024-07-29 13:07:18 -07:00

87 lines
2.9 KiB
Python

import json
import os
import time
import openai
from openai import OpenAI
class OpenAIBatchProcessor:
def __init__(self, api_key):
# client = OpenAI(api_key=api_key)
client = openai.Client(base_url="http://127.0.0.1:30000/v1", api_key="EMPTY")
self.client = client
def process_batch(self, input_file_path, endpoint, completion_window):
# Upload the input file
with open(input_file_path, "rb") as file:
uploaded_file = self.client.files.create(file=file, purpose="batch")
# Create the batch job
batch_job = self.client.batches.create(
input_file_id=uploaded_file.id,
endpoint=endpoint,
completion_window=completion_window,
)
# Monitor the batch job status
while batch_job.status not in ["completed", "failed", "cancelled"]:
time.sleep(3) # Wait for 3 seconds before checking the status again
print(
f"Batch job status: {batch_job.status}...trying again in 3 seconds..."
)
batch_job = self.client.batches.retrieve(batch_job.id)
# Check the batch job status and errors
if batch_job.status == "failed":
print(f"Batch job failed with status: {batch_job.status}")
print(f"Batch job errors: {batch_job.errors}")
return None
# If the batch job is completed, process the results
if batch_job.status == "completed":
# print result of batch job
print("batch", batch_job.request_counts)
result_file_id = batch_job.output_file_id
# Retrieve the file content from the server
file_response = self.client.files.content(result_file_id)
result_content = file_response.read() # Read the content of the file
# Save the content to a local file
result_file_name = "batch_job_complete_results.jsonl"
with open(result_file_name, "wb") as file:
file.write(result_content) # Write the binary content to the file
# Load data from the saved JSONL file
results = []
with open(result_file_name, "r", encoding="utf-8") as file:
for line in file:
json_object = json.loads(
line.strip()
) # Parse each line as a JSON object
results.append(json_object)
return results
else:
print(f"Batch job failed with status: {batch_job.status}")
return None
# Initialize the OpenAIBatchProcessor
api_key = os.environ.get("OPENAI_API_KEY")
processor = OpenAIBatchProcessor(api_key)
# Process the batch job
input_file_path = "input_complete.jsonl"
endpoint = "/v1/completions"
completion_window = "24h"
# Process the batch job
results = processor.process_batch(input_file_path, endpoint, completion_window)
# Print the results
print(results)