I am using Python Flask app for chat over data. In the console I am getting streamable response directly from the OpenAI since I can enable streming with a flag streaming=True
The problem is, that I can't "forward" the stream or "show" the strem than in my API call.
Code for the processing OpenAI and chain is:
def askQuestion(self, collection_id, question):
collection_name = "collection-" + str(collection_id)
self.llm = ChatOpenAI(model_name=self.model_name, temperature=self.temperature, openai_api_key=os.environ.get('OPENAI_API_KEY'), streaming=True, callback_manager=CallbackManager([StreamingStdOutCallbackHandler()]))
self.memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True, output_key='answer')
chroma_Vectorstore = Chroma(collection_name=collection_name, embedding_function=self.embeddingsOpenAi, client=self.chroma_client)
self.chain = ConversationalRetrievalChain.from_llm(self.llm, chroma_Vectorstore.as_retriever(similarity_search_with_score=True),
result = self.chain({"question": question})
res_dict = {
"answer": result["answer"],
res_dict["source_documents"] = []
for source in result["source_documents"]:
"page_content": source.page_content,
"metadata": source.metadata
return res_dict
and the API route code:
@app.route("/collection/<int:collection_id>/ask_question", methods=["POST"])
def ask_question(collection_id):
question = request.form["question"]
# response_generator = document_thread.askQuestion(collection_id, question)
# return jsonify(response_generator)
def stream(question):
completion = document_thread.askQuestion(collection_id, question)
for line in completion['answer']:
yield line
return app.response_class(stream_with_context(stream(question)))
I am testing my endpoint with curl and I am passing flag -N
to curl, so I should get the streamable response, if it is possible.
When I make API call first the endpoint is waiting to process the data (I can see in my terminal in VS code the streamable answer) and when finished, I get everything displayed in one go.
With the usage of threading
and callback
we can have a streaming response from flask API.
In flask API, you may create a queue to register tokens through langchain's callback.
class StreamingHandler(BaseCallbackHandler):
def on_llm_new_token(self, token: str, **kwargs) -> None:
You may get
tokens from the same queue in your flask route.
from flask import Response, stream_with_context
import threading
def stream_output():
q = Queue()
def generate(rq: Queue):
# add your logic to prevent while loop
# to run indefinitely
while( ...):
yield rq.get()
callback_fn = StreamingHandler(q)
threading.Thread(target= askQuestion, args=(collection_id, question, callback_fn))
return Response(stream_with_context(generate(q))
In your langchain's ChatOpenAI
add the above custom callback StreamingHandler
self.llm = ChatOpenAI(
For reference: