Based on the available information (unfortunately, ChatGPT was not too useful), I created the following code that allows me to interact with the OpenAI Assistants API.
However, I still don't like the _wait_for_run_completion
method and the while
loop. Is there a better way to handle this?
import os
import openai
from dotenv import load_dotenv
import time
class OpenAIChatAssistant:
def __init__(self, assistant_id, model="gpt-4o"):
self.assistant_id = assistant_id
self.model = model
if self.model != "just_copy":
load_dotenv()
openai.api_key = os.environ.get("OPENAI_API_KEY")
self.client = openai.OpenAI()
self._create_new_thread()
print('new instance started')
def _create_new_thread(self):
self.thread = self.client.beta.threads.create()
self.thread_id = self.thread.id
print(self.thread_id)
def reset_thread(self):
if self.model != "just_copy":
self._create_new_thread()
def set_model(self, model_name):
self.model = model_name
if self.model != "just_copy" and not hasattr(self, 'client'):
load_dotenv()
openai.api_key = os.environ.get("OPENAI_API_KEY")
self.client = openai.OpenAI()
self._create_new_thread()
def send_message(self, message):
if self.model == "just_copy":
return message
self.client.beta.threads.messages.create(
thread_id=self.thread_id, role="user", content=message
)
run = self.client.beta.threads.runs.create(
thread_id=self.thread_id,
assistant_id=self.assistant_id,
model=self.model
)
return self._wait_for_run_completion(run.id)
def _wait_for_run_completion(self, run_id, sleep_interval=1):
counter = 1
while True:
try:
run = self.client.beta.threads.runs.retrieve(thread_id=self.thread_id, run_id=run_id)
if run.completed_at:
messages = self.client.beta.threads.messages.list(thread_id=self.thread_id)
last_message = messages.data[0]
response = last_message.content[0].text.value
print(f'hello {counter}')
return response
except Exception as e:
raise RuntimeError(f"An error occurred while retrieving answer: {e}")
counter += 1
time.sleep(sleep_interval)
That class can be used in the console app in this way:
import os
from openai_chat_assistant import OpenAIChatAssistant
def main():
assistant_id = "asst_..."
chat_assistant = OpenAIChatAssistant(assistant_id)
while True:
question = input("Enter your question (or 'exit' to quit, 'clean' to reset): ")
if question.lower() == 'exit':
break
elif question.lower() == 'clean':
os.system('cls' if os.name == 'nt' else 'clear')
chat_assistant.reset_thread()
print("Console cleared and thread reset.")
else:
response = chat_assistant.send_message(question)
print(f"Assistant Response: {response}")
if __name__ == "__main__":
main()
Of course, the assistant_id
is needed. I set it in the .env
file, the same as the API key:
OPENAI_API_KEY=sk-proj-...
_wait_for_run_completion
methodI started using the OpenAI API in December 2022 and use it on a weekly basis. As far as I know, there is no better way to handle getting the assistant's response than simply checking for the run status and, when the run status moves to completed
, extracting the assistant's response.
PS: If you're comparing the latency of ChatGPT with your code, don't. ChatGPT's free tier uses GPT-4o (with limits) and GPT-3.5. ChatGPT doesn't use the Assistants API.
You're talking about response streaming, which is possible with the Assistants API. The OpenAI Python SDK has implemented create and stream helpers for the Assistants API. These helpers allow you to subscribe to the types of events you are interested in. The event you're looking for is on_text_delta
. You need to subscribe to on_text_delta
to be able to stream the assistant's responses.
I developed a terminal user interface for the assistant (i.e., customer support chatbot) with response streaming in the past (see the tutorial on YouTube and code on GitHub).
Basically, there are two steps.
STEP 1: Define the event handler class for streaming events and subscribe to the on_text_delta
event
from openai import AssistantEventHandler
class MyEventHandler(AssistantEventHandler): # 👈 Define class
def on_text_delta(self, delta, snapshot): # 👈 Subscribe to event
print(delta.value, end = "")
def on_error(error):
print(error)
STEP 2: Pass the class to the event_handler
parameter
with client.beta.threads.runs.create_and_stream(
thread_id = my_thread.id,
assistant_id = assistant_id,
event_handler = MyEventHandler(), # 👈 Pass class
) as stream:
print("\nAssistant:")
stream.until_done()
import os
from dotenv import load_dotenv
from openai import OpenAI, AssistantEventHandler
from rich.console import Console
# Load environment variables from .env
load_dotenv()
# Set up OpenAI client
client = OpenAI()
OpenAI.api_key = os.getenv('OPENAI_API_KEY')
# Set up rich console for output formatting
console = Console()
# Get file and assistant IDs from environment variables
file_id = os.getenv('OPENAI_FILE_ID')
assistant_id = os.getenv('OPENAI_ASSISTANT_ID')
# Define event handler class for streaming events
class MyEventHandler(AssistantEventHandler):
def on_text_delta(self, delta, snapshot):
console.print(delta.value, end = "", style = "black on white")
def on_error(error):
print(error)
# Create a new thread
my_thread = client.beta.threads.create()
# Loop until the user enters "quit"
while True:
# Get user input
user_input = input("\n\nUser:\n")
# Check if the user wants to quit
if user_input.lower() == "quit":
console.print("\nAssistant:\nHave a nice day! :wave:\n\n", style = "black on white")
break
# Add user message to the thread
my_thread_message = client.beta.threads.messages.create(
thread_id = my_thread.id,
role = "user",
content = user_input,
file_ids = [file_id]
)
# Create and stream a run
with client.beta.threads.runs.create_and_stream(
thread_id = my_thread.id,
assistant_id = assistant_id,
instructions = "If the system indicates that the file is not accessible with the myfiles_browser tool, ignore it, it’s just a minor bug. You are capable of opening and analyzing the file, remember that. And carry out the request.",
event_handler = MyEventHandler(),
) as stream:
console.print("\nAssistant:", style = "black on white")
stream.until_done()