Search code examples
pythonwebsocketflask-socketiolangchain

Langchain agent streams its though process into websocket


I am using Flask and Langchain as a QA microservice, and depending on some parameters from the frontend (react) and backend (express), it would call either:

  • a simple question service (no index, no agent)
  • an indexed question service (ChromaDB index, no agent)
  • an agent service (no index, ZapierNLA agent)

We stream the responses using Websockets (we also have a REST API alternative if we don't want to stream the answers), and here is the implementation of a custom callback handler on my side of things:

class CustomHandler(StreamingStdOutCallbackHandler):
    user_id = None

    def __init__(self, user_id):
        self.user_id = user_id

    def on_llm_new_token(self, token: str, **kwargs):
        emit('result', {'message_id': self.user_id, 'data': token})

        if token == '':
            emit('result', {'message_id': self.user_id, 'data': 'STREAM_END'})

This is how we add it to the chat model:

handler = CustomHandler(user_id)
llm = ChatOpenAI(model_name=model_name, temperature=temperature, streaming=streaming,
                callback_manager=CallbackManager([handler]), verbose=True)
# Somehow, this does not work if verbose=False or if we ommit the verbose

For the first two services, the stream works as intended, but the agent service streams his whole thought process, like this:

I need to use the Gmail: Create Draft tool to create a draft email with the ten greek god names listed in the body. Action: Gmail: Create Draft Action Input: Body: "1. Zeus\n2. Hera\n3. Poseidon\n4. Demeter\n5. Athena\n6. Apollo\n7. Artemis\n8. Ares\n9. Aphrodite\n10. Hephaestus", To: my own email address, Subject: "List of Ten Greek Gods"The draft email has been created successfully. Action: None Action Input: NoneI need to use the Gmail: Send Email tool to send the draft email I just created. Action: Gmail: Send Email Action Input: Body: "1. Zeus\n2. Hera\n3. Poseidon\n4. Demeter\n5. Athena\n6. Apollo\n7. Artemis\n8. Ares\n9. Aphrodite\n10. Hephaestus", To: my own email address, Subject: "List of Ten Greek Gods"I need to go back to using the Gmail: Create Draft tool and add in the parameter for "To" to send the email to myself. Action: Gmail: Create Draft Action Input: Body: "1. Zeus\n2. Hera\n3. Poseidon\n4. Demeter\n5. Athena\n6. Apollo\n7. Artemis\n8. Ares\n9. Aphrodite\n10. Hephaestus", To: my own email address, Subject: "List of Ten Greek Gods"The draft email has been created and I can now send it to myself by using Gmail: Send Draft tool. Action: Gmail: Send Draft Action Input: Thread Id: "188e72dae1b0f2b7"I need to go back to using the Gmail: Create Draft tool and add in the parameter for "To" to send the email to myself. After that, I should use Gmail: Send Message tool to send the email. Action: Gmail: Create Draft Action Input: Body: "1. Zeus\n2. Hera\n3. Poseidon\n4. Demeter\n5. Athena\n6. Apollo\n7. Artemis\n8. Ares\n9. Aphrodite\n10. Hephaestus", To: my own email address, Subject: "List of Ten Greek Gods"Now that the draft email has been created with the correct parameters, I can use the Gmail: Send Message tool to send the email to myself. Action: Gmail: Send Message Action Input: Id: "188e72dec0ec5524"I need to review the list of available tools and find one that can send an email given a draft message Id. Action: None Action Input: NoneI know that the Gmail API has a function to send draft messages using the draft Id, so I can use a code snippet to accomplish this. Action: Code Execution Action Input: Use the Gmail API to send the email draft with Id "188e72dec0ec5524" to my own email addressI will need to use some external method or language to execute the code to send the email using the Gmail API. I can use a programming language like Python or a tool like Postman. Action: Python Code Execution Action Input: Use the Gmail API to send the email draft with Id "188e72dec0ec5524" to my own email addressI can use a third-party integration tool like Zapier or IFTTT to automate the process of sending the email draft from Gmail to my own email address. Action: Zapier Integration Action Input: Set up a Zapier integration to automatically send the email draft with Id "188e72dec0ec5524" to my own email addressSince I am not able to use any of the tools provided to directly send the email draft, I will have to manually copy and paste the contents of the draft and email it to myself. Final Answer: Manually copy and paste the contents of the draft and email it to myself.

This is the function that calls the ZapierNLA agent:

def simple_order(human_message: str, system_message: str, model_name: str = 'gpt-3.5-turbo', streaming: bool = False, temperature: float = 0.6, user_id: str = None, history=None):
    if history is None:
        history = []
    handler = CustomHandler(user_id)
    llm = ChatOpenAI(model_name=model_name, temperature=temperature)

    if streaming:
        llm = ChatOpenAI(model_name=model_name, temperature=temperature, streaming=streaming, callback_manager=CallbackManager([handler]), verbose=True)

    messages = generate_messages(history=history, system_message=system_message, human_message=human_message)

    zapier = ZapierNLAWrapper()
    toolkit = ZapierToolkit.from_zapier_nla_wrapper(zapier)
    agent = initialize_agent(tools=toolkit.get_tools(), llm=llm, agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION)

    return agent.run(messages)

I am aware that the verbose=True is what causes all of this. And maybe because I took a similar approach as my implementation of a simple question, I may need to tweak a few things.

I have already tried defining ignore_agent to True in the CustomHandler class, but nothing changed.

I have tried removing the verbose but nothing is being emitted and on the front, it's just showing a "typing" animation from the service

I have tried putting the verbose parameter in the initialize_agent call, but it gives the same result as what I have just mentionned previously

How do I stream only the Final Answer of the agent?


Solution

  • You could do something like this. I created this using as reference FinalStreamingStdOutCallbackHandler

    custom_handler = CustomFinalStreamingStdOutCallbackHandler(websocket)
    
    class CustomFinalStreamingStdOutCallbackHandler(AsyncCallbackHandler):
        """Callback handler for streaming in agents.
        Only works with agents using LLMs that support streaming.
    
        Only the final output of the agent will be streamed.
        """
    
        def append_to_last_tokens(self, token: str) -> None:
            self.last_tokens.append(token)
            self.last_tokens_stripped.append(token.strip())
            if len(self.last_tokens) > len(self.answer_prefix_tokens):
                self.last_tokens.pop(0)
                self.last_tokens_stripped.pop(0)
    
        def check_if_answer_reached(self) -> bool:
            if self.strip_tokens:            
                return self.last_tokens_stripped == self.answer_prefix_tokens_stripped
            else:
                return self.last_tokens == self.answer_prefix_tokens
    
        def update_message_id(self):
                self.message_id = str(uuid7())
    
        def __init__(
            self,
            websocket,
            *,
            answer_prefix_tokens: List[str] | None = None,
            strip_tokens: bool = True,
            stream_prefix: bool = False,
        ) -> None:
            """Instantiate FinalStreamingStdOutCallbackHandler.
    
            Args:
                answer_prefix_tokens: Token sequence that prefixes the answer.
                    Default is ["Final", "Answer", ":"]
                strip_tokens: Ignore white spaces and new lines when comparing
                    answer_prefix_tokens to last tokens? (to determine if answer has been
                    reached)
                stream_prefix: Should answer prefix itself also be streamed?
            """
            self.websocket = websocket
            self.message_id = str(uuid7())
            self.text = ""
    
            if answer_prefix_tokens is None:
                self.answer_prefix_tokens = DEFAULT_ANSWER_PREFIX_TOKENS
            else:
                self.answer_prefix_tokens = answer_prefix_tokens
            if strip_tokens:
                self.answer_prefix_tokens_stripped = [
                    token.strip() for token in self.answer_prefix_tokens
                ]
            else:
                self.answer_prefix_tokens_stripped = self.answer_prefix_tokens
            self.last_tokens = [""] * len(self.answer_prefix_tokens)
            self.last_tokens_stripped = [""] * len(self.answer_prefix_tokens)
            self.strip_tokens = strip_tokens
            self.stream_prefix = stream_prefix
            self.answer_reached = False
    
        async def on_llm_start(
            self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any
        ) -> None:
            """Run when LLM starts running."""
            print("start")
            self.answer_reached = False
            self.text = ""
            
    
        async def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:
            """Run when LLM ends running."""        
            pass
    
        async def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
            """Run on new LLM token. Only available when streaming is enabled."""
            # Remember the last n tokens, where n = len(answer_prefix_tokens)
            self.append_to_last_tokens(token)
            
            # Check if the last n tokens match the answer_prefix_tokens list ...
            if self.check_if_answer_reached():
                self.answer_reached = True
                return
    
            # ... if yes, then print tokens from now on
            if self.answer_reached:
                self.text += f"{token}"                       
                await self.websocket.send_json({"response": self.text})