Search code examples
pythonpython-3.xopenai-apiopenai-assistants-api

OpenAI Assistants API: Is there a better way to wait for the assistant's response, and how do I display the assistant's answer incrementally?


Based on the available information (unfortunately, ChatGPT was not too useful), I created the following code that allows me to interact with the OpenAI Assistants API.

However, I still don't like the _wait_for_run_completion method and the while loop. Is there a better way to handle this?

import os
import openai
from dotenv import load_dotenv
import time


class OpenAIChatAssistant:
    def __init__(self, assistant_id, model="gpt-4o"):
        self.assistant_id = assistant_id
        self.model = model

        if self.model != "just_copy":
            load_dotenv()
            openai.api_key = os.environ.get("OPENAI_API_KEY")
            self.client = openai.OpenAI()
            self._create_new_thread()
        print('new instance started')

    def _create_new_thread(self):
        self.thread = self.client.beta.threads.create()
        self.thread_id = self.thread.id
        print(self.thread_id)

    def reset_thread(self):
        if self.model != "just_copy":
            self._create_new_thread()

    def set_model(self, model_name):
        self.model = model_name

        if self.model != "just_copy" and not hasattr(self, 'client'):
            load_dotenv()
            openai.api_key = os.environ.get("OPENAI_API_KEY")
            self.client = openai.OpenAI()
            self._create_new_thread()

    def send_message(self, message):
        if self.model == "just_copy":
            return message

        self.client.beta.threads.messages.create(
            thread_id=self.thread_id, role="user", content=message
        )
        run = self.client.beta.threads.runs.create(
            thread_id=self.thread_id,
            assistant_id=self.assistant_id,
            model=self.model
        )
        return self._wait_for_run_completion(run.id)

    def _wait_for_run_completion(self, run_id, sleep_interval=1):
        counter = 1
        while True:
            try:
                run = self.client.beta.threads.runs.retrieve(thread_id=self.thread_id, run_id=run_id)
                if run.completed_at:
                    messages = self.client.beta.threads.messages.list(thread_id=self.thread_id)
                    last_message = messages.data[0]
                    response = last_message.content[0].text.value
                    print(f'hello {counter}')
                    return response
            except Exception as e:
                raise RuntimeError(f"An error occurred while retrieving answer: {e}")
            counter += 1
            time.sleep(sleep_interval)

That class can be used in the console app in this way:

import os
from openai_chat_assistant import OpenAIChatAssistant


def main():
    assistant_id = "asst_..."
    chat_assistant = OpenAIChatAssistant(assistant_id)

    while True:
        question = input("Enter your question (or 'exit' to quit, 'clean' to reset): ")
        if question.lower() == 'exit':
            break
        elif question.lower() == 'clean':
            os.system('cls' if os.name == 'nt' else 'clear')
            chat_assistant.reset_thread()
            print("Console cleared and thread reset.")
        else:
            response = chat_assistant.send_message(question)
            print(f"Assistant Response: {response}")


if __name__ == "__main__":
    main()

Of course, the assistant_id is needed. I set it in the .env file, the same as the API key:

OPENAI_API_KEY=sk-proj-...

Solution

  • Regarding the _wait_for_run_completion method

    I started using the OpenAI API in December 2022 and use it on a weekly basis. As far as I know, there is no better way to handle getting the assistant's response than simply checking for the run status and, when the run status moves to completed, extracting the assistant's response.

    PS: If you're comparing the latency of ChatGPT with your code, don't. ChatGPT's free tier uses GPT-4o (with limits) and GPT-3.5. ChatGPT doesn't use the Assistants API.

    Regarding "incremental answer displaying"

    You're talking about response streaming, which is possible with the Assistants API. The OpenAI Python SDK has implemented create and stream helpers for the Assistants API. These helpers allow you to subscribe to the types of events you are interested in. The event you're looking for is on_text_delta. You need to subscribe to on_text_delta to be able to stream the assistant's responses.

    I developed a terminal user interface for the assistant (i.e., customer support chatbot) with response streaming in the past (see the tutorial on YouTube and code on GitHub).

    Basically, there are two steps.

    STEP 1: Define the event handler class for streaming events and subscribe to the on_text_delta event

    from openai import AssistantEventHandler
    
    class MyEventHandler(AssistantEventHandler): # 👈 Define class
        def on_text_delta(self, delta, snapshot): # 👈 Subscribe to event
            print(delta.value, end = "")
    
        def on_error(error):
            print(error)
    

    STEP 2: Pass the class to the event_handler parameter

    with client.beta.threads.runs.create_and_stream(
            thread_id = my_thread.id,
            assistant_id = assistant_id,
            event_handler = MyEventHandler(), # 👈 Pass class
        ) as stream:
            print("\nAssistant:")
            stream.until_done()
    

    Full code:

    import os
    from dotenv import load_dotenv
    from openai import OpenAI, AssistantEventHandler
    from rich.console import Console
    
    # Load environment variables from .env
    load_dotenv()
    
    # Set up OpenAI client
    client = OpenAI()
    OpenAI.api_key = os.getenv('OPENAI_API_KEY')
    
    # Set up rich console for output formatting
    console = Console()
    
    # Get file and assistant IDs from environment variables
    file_id = os.getenv('OPENAI_FILE_ID')
    assistant_id = os.getenv('OPENAI_ASSISTANT_ID')
    
    # Define event handler class for streaming events
    class MyEventHandler(AssistantEventHandler):
        def on_text_delta(self, delta, snapshot):
            console.print(delta.value, end = "", style = "black on white")
    
        def on_error(error):
            print(error)
    
    # Create a new thread
    my_thread = client.beta.threads.create()
    
    # Loop until the user enters "quit"
    while True:
        # Get user input
        user_input = input("\n\nUser:\n")
    
        # Check if the user wants to quit
        if user_input.lower() == "quit":
            console.print("\nAssistant:\nHave a nice day! :wave:\n\n", style = "black on white")
            break
    
        # Add user message to the thread
        my_thread_message = client.beta.threads.messages.create(
            thread_id = my_thread.id,
            role = "user",
            content = user_input,
            file_ids = [file_id]
        )
    
        # Create and stream a run
        with client.beta.threads.runs.create_and_stream(
            thread_id = my_thread.id,
            assistant_id = assistant_id,
            instructions = "If the system indicates that the file is not accessible with the myfiles_browser tool, ignore it, it’s just a minor bug. You are capable of opening and analyzing the file, remember that. And carry out the request.",
            event_handler = MyEventHandler(),
        ) as stream:
            console.print("\nAssistant:", style = "black on white")
            stream.until_done()