This is an implementation based on langchain and flask and refers to an implementation to be able to stream responses from the OpenAI server in langchain to a page with javascript that can show the streamed response.
I tried all ways to modify the code below to replace the langchain library from openai to chatopenai without success, i upload below both implementations (the one with openai working) and the one chatopenai with error. thank you to all the community and those who can help me to understand the problem, it would be very helpful if you could also show me how to solve it since I have been trying for days and the error it shows has really no significance.
Code version with library that works but reports as deprecated:
from flask import Flask, Response
import threading
import queue
from langchain.llms import OpenAI
from langchain.callbacks.base import BaseCallbackManager
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
app = Flask(__name__)
@app.route('/')
def index():
return Response('''<!DOCTYPE html>
<html>
<head><title>Flask Streaming Langchain Example</title></head>
<body>
<div id="output"></div>
<script>
const outputEl = document.getElementById('output');
(async function() {
try {
const controller = new AbortController();
const signal = controller.signal;
const timeout = 120000; // Imposta il timeout su 120 secondi
setTimeout(() => controller.abort(), timeout);
const response = await fetch('/chain', {method: 'POST', signal});
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) { break; }
const text = decoder.decode(value, {stream: true});
outputEl.innerHTML += text;
}
} catch (err) {
console.error(err);
}
})();
</script>
</body>
</html>''', mimetype='text/html')
class ThreadedGenerator:
def __init__(self):
self.queue = queue.Queue()
def __iter__(self):
return self
def __next__(self):
item = self.queue.get()
if item is StopIteration: raise item
return item
def send(self, data):
self.queue.put(data)
def close(self):
self.queue.put(StopIteration)
class ChainStreamHandler(StreamingStdOutCallbackHandler):
def __init__(self, gen):
super().__init__()
self.gen = gen
def on_llm_new_token(self, token: str, **kwargs):
self.gen.send(token)
def llm_thread(g, prompt):
try:
llm = OpenAI(
model_name="gpt-4",
verbose=True,
streaming=True,
callback_manager=BaseCallbackManager([ChainStreamHandler(g)]),
temperature=0.7,
)
llm(prompt)
finally:
g.close()
def chain(prompt):
g = ThreadedGenerator()
threading.Thread(target=llm_thread, args=(g, prompt)).start()
return g
@app.route('/chain', methods=['POST'])
def _chain():
return Response(chain("Create a poem about the meaning of life \n\n"), mimetype='text/plain')
if __name__ == '__main__':
app.run(threaded=True, debug=True)
Version with error (OpenAI replaced with ChatOpenAI)
from flask import Flask, Response
import threading
import queue
from langchain.chat_models import ChatOpenAI
from langchain.callbacks.base import BaseCallbackManager
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
app = Flask(__name__)
@app.route('/')
def index():
return Response('''<!DOCTYPE html>
<html>
<head><title>Flask Streaming Langchain Example</title></head>
<body>
<div id="output"></div>
<script>
const outputEl = document.getElementById('output');
(async function() {
try {
const controller = new AbortController();
const signal = controller.signal;
const timeout = 120000; // Imposta il timeout su 120 secondi
setTimeout(() => controller.abort(), timeout);
const response = await fetch('/chain', {method: 'POST', signal});
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) { break; }
const text = decoder.decode(value, {stream: true});
outputEl.innerHTML += text;
}
} catch (err) {
console.error(err);
}
})();
</script>
</body>
</html>''', mimetype='text/html')
class ThreadedGenerator:
def __init__(self):
self.queue = queue.Queue()
def __iter__(self):
return self
def __next__(self):
item = self.queue.get()
if item is StopIteration: raise item
return item
def send(self, data):
self.queue.put(data)
def close(self):
self.queue.put(StopIteration)
class ChainStreamHandler(StreamingStdOutCallbackHandler):
def __init__(self, gen):
super().__init__()
self.gen = gen
def on_llm_new_token(self, token: str, **kwargs):
self.gen.send(token)
def on_chat_model_start(self, token: str):
print("started")
def llm_thread(g, prompt):
try:
llm = ChatOpenAI(
model_name="gpt-4",
verbose=True,
streaming=True,
callback_manager=BaseCallbackManager([ChainStreamHandler(g)]),
temperature=0.7,
)
llm(prompt)
finally:
g.close()
def chain(prompt):
g = ThreadedGenerator()
threading.Thread(target=llm_thread, args=(g, prompt)).start()
return g
@app.route('/chain', methods=['POST'])
def _chain():
return Response(chain("parlami dei 5 modi di dire in inglese che gli italiani conoscono meno \n\n"), mimetype='text/plain')
if __name__ == '__main__':
app.run(threaded=True, debug=True)
Error showing the console at startup and at the time I enter the web page:
Error in ChainStreamHandler.on_chat_model_start callback: ChainStreamHandler.on_chat_model_start() got an unexpected keyword argument 'run_id'
Exception in thread Thread-4 (llm_thread):
127.0.0.1 - - [09/Sep/2023 18:09:29] "POST /chain HTTP/1.1" 200 -
Traceback (most recent call last):
File "C:\Users\user22\Desktop\Work\TESTPROJ\env\Lib\site-packages\langchain\callbacks\manager.py", line 300, in _handle_event
getattr(handler, event_name)(*args, **kwargs)
File "C:\Users\user22\Desktop\Work\TESTPROJ\env\Lib\site-packages\langchain\callbacks\base.py", line 168, in on_chat_model_start
raise NotImplementedError(
NotImplementedError: StdOutCallbackHandler does not implement `on_chat_model_start`
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "C:\Users\user22\AppData\Local\Programs\Python\Python311\Lib\threading.py", line 1038, in _bootstrap_inner
self.run()
File "C:\Users\user22\AppData\Local\Programs\Python\Python311\Lib\threading.py", line 975, in run
self._target(*self._args, **self._kwargs)
File "c:\Users\user22\Desktop\Work\TESTPROJ\streamresp.py", line 90, in llm_thread
llm(prompt)
File "C:\Users\user22\Desktop\Work\TESTPROJ\env\Lib\site-packages\langchain\chat_models\base.py", line 552, in __call__
generation = self.generate(
^^^^^^^^^^^^^^
File "C:\Users\user22\Desktop\Work\TESTPROJ\env\Lib\site-packages\langchain\chat_models\base.py", line 293, in generate
run_managers = callback_manager.on_chat_model_start(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\user22\Desktop\Work\TESTPROJ\env\Lib\site-packages\langchain\callbacks\manager.py", line 1112, in on_chat_model_start
_handle_event(
File "C:\Users\user22\Desktop\Work\TESTPROJ\env\Lib\site-packages\langchain\callbacks\manager.py", line 304, in _handle_event
message_strings = [get_buffer_string(m) for m in args[1]]
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\user22\Desktop\Work\TESTPROJ\env\Lib\site-packages\langchain\callbacks\manager.py", line 304, in <listcomp>
message_strings = [get_buffer_string(m) for m in args[1]]
^^^^^^^^^^^^^^^^^^^^
File "C:\Users\user22\Desktop\Work\TESTPROJ\env\Lib\site-packages\langchain\schema\messages.py", line 52, in get_buffer_string
raise ValueError(f"Got unsupported message type: {m}")
ValueError: Got unsupported message type: p
thank you very much for the support!
Thanks to python273 user on github I've resolved:
import os
os.environ["OPENAI_API_KEY"] = ""
from flask import Flask, Response, request
import threading
import queue
from langchain.chat_models import ChatOpenAI
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
from langchain.schema import AIMessage, HumanMessage, SystemMessage
app = Flask(__name__)
@app.route('/')
def index():
# just for the example, html is included directly, move to .html file
return Response('''
<!DOCTYPE html>
<html>
<head><title>Flask Streaming Langchain Example</title></head>
<body>
<form id="form">
<input name="prompt" value="write a short koan story about seeing beyond"/>
<input type="submit"/>
</form>
<div id="output"></div>
<script>
const formEl = document.getElementById('form');
const outputEl = document.getElementById('output');
let aborter = new AbortController();
async function run() {
aborter.abort(); // cancel previous request
outputEl.innerText = '';
aborter = new AbortController();
const prompt = new FormData(formEl).get('prompt');
try {
const response = await fetch(
'/chain', {
signal: aborter.signal,
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({
prompt
}),
}
);
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) { break; }
const decoded = decoder.decode(value, {stream: true});
outputEl.innerText += decoded;
}
} catch (err) {
console.error(err);
}
}
run(); // run on initial prompt
formEl.addEventListener('submit', function(event) {
event.preventDefault();
run();
});
</script>
</body>
</html>
''', mimetype='text/html')
class ThreadedGenerator:
def __init__(self):
self.queue = queue.Queue()
def __iter__(self):
return self
def __next__(self):
item = self.queue.get()
if item is StopIteration: raise item
return item
def send(self, data):
self.queue.put(data)
def close(self):
self.queue.put(StopIteration)
class ChainStreamHandler(StreamingStdOutCallbackHandler):
def __init__(self, gen):
super().__init__()
self.gen = gen
def on_llm_new_token(self, token: str, **kwargs):
self.gen.send(token)
def llm_thread(g, prompt):
try:
chat = ChatOpenAI(
verbose=True,
streaming=True,
callbacks=[ChainStreamHandler(g)],
temperature=0.7,
)
chat([HumanMessage(content=prompt)])
finally:
g.close()
def chain(prompt):
g = ThreadedGenerator()
threading.Thread(target=llm_thread, args=(g, prompt)).start()
return g
@app.route('/chain', methods=['POST'])
def _chain():
return Response(chain(request.json['prompt']), mimetype='text/plain')
if __name__ == '__main__':
app.run(threaded=True, debug=True)
Link to the original reply: https://gist.github.com/python273/563177b3ad5b9f74c0f8f3299ec13850