I'm having a simple websocket server that's supposed to feed camera frames to an electron application. The code is mostly taken from here. The idea is that when I open the websocket, python will start a while loop that continuously sends frames that will be captured by the js 'client'. However, in order to be able to stop this while loop I need to run this method on an executor, because otherwise it'll get stuck in it. The loop is supposed to run as long as an event is not set and this event is being set on on_close method of the websocket.
I made a separate method that simulates the camera, in the actual app the frames are coming from another process.
The problem is that after running the program after a while my logs are flooded by: ERROR:asyncio:Task was destroyed but it is pending!
My assumption is that there are some synchronization issues between the main ioloop and executors loop. I have also tried starting/stopping a thread that sends frames when opening/closing the websocket but got the same issue...
here is my code:
ws_server.py
import cv2
import numpy
import asyncio
import traceback
from concurrent.futures import ThreadPoolExecutor
from tornado.websocket import WebSocketHandler
from tornado.concurrent import run_on_executor
from tornado.ioloop import IOLoop
from tornado.web import Application
from threading import Event
class StreamHandler(WebSocketHandler):
executor = ThreadPoolExecutor()
def check_origin(self, origin):
return True
def initialize(self):
self.stop_event = Event()
self.camera = self.frame_generator()
def frame_generator(self):
while not self.stop_event.is_set():
frame = numpy.random.randint(0, 255, (800, 800))
_, frame = cv2.imencode('.jpg', frame)
frame = frame.tostring()
yield frame
def open(self):
print("Connection opened.")
self.send_frames()
@run_on_executor
def send_frames(self):
asyncio.set_event_loop(asyncio.new_event_loop())
while not self.stop_event.is_set():
img = next(self.camera)
if img is not None:
try:
self.write_message(img, binary=True)
except:
print(traceback.format_exc())
def on_close(self):
self.stop_event.set()
print("Connection closed.")
def main():
print("starting server.")
app = Application([
(r"/camera", StreamHandler),
])
app.listen(8083)
IOLoop.instance().start()
if __name__ == '__main__':
main()
client.js
var img = document.getElementById("liveImg");
var arrayBuffer;
var ws = new WebSocket("ws://localhost:8083/camera");
ws.binaryType = 'arraybuffer';
ws.onopen = function(){
console.log("connection was established");
};
ws.onmessage = function(evt){
arrayBuffer = evt.data;
img.src = "data:image/jpeg;base64," + encode(new Uint8Array(arrayBuffer));
};
function encode (input) {
var keyStr = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/=";
var output = "";
var chr1, chr2, chr3, enc1, enc2, enc3, enc4;
var i = 0;
while (i < input.length) {
chr1 = input[i++];
chr2 = i < input.length ? input[i++] : Number.NaN; // Not sure if the index
chr3 = i < input.length ? input[i++] : Number.NaN; // checks are needed here
enc1 = chr1 >> 2;
enc2 = ((chr1 & 3) << 4) | (chr2 >> 4);
enc3 = ((chr2 & 15) << 2) | (chr3 >> 6);
enc4 = chr3 & 63;
if (isNaN(chr2)) {
enc3 = enc4 = 64;
} else if (isNaN(chr3)) {
enc4 = 64;
}
output += keyStr.charAt(enc1) + keyStr.charAt(enc2) +
keyStr.charAt(enc3) + keyStr.charAt(enc4);
}
return output;
}
index.html
<html>
<head>
<title>livecamera</title>
<img id="liveImg" width="480" height="360"></canvas>
<script type="text/javascript" src="./client.js"></script>
</head>
</html>
And here is an example of what ends up in my logs:
ERROR:asyncio:Task was destroyed but it is pending! task: .wrapper() running at /home/vladinny/Projects/photo-booth-cv/venv/lib/python3.6/site-packages/tornado/websocket.py:1102>> ERROR:asyncio:Task was destroyed but it is pending! task: .wrapper() running at /home/vladinny/Projects/photo-booth-cv/venv/lib/python3.6/site-packages/tornado/websocket.py:1102>> ERROR:asyncio:Task was destroyed but it is pending! task: .wrapper() running at /home/vladinny/Projects/photo-booth-cv/venv/lib/python3.6/site-packages/tornado/websocket.py:1102>> ERROR:asyncio:Task was destroyed but it is pending! task: .wrapper() running at /home/vladinny/Projects/photo-booth-cv/venv/lib/python3.6/site-packages/tornado/websocket.py:1102>> ERROR:asyncio:Task was destroyed but it is pending! task: .wrapper() running at /home/vladinny/Projects/photo-booth-cv/venv/lib/python3.6/site-packages/tornado/websocket.py:1102>> ERROR:asyncio:Task was destroyed but it is pending! task: .wrapper() running at /home/vladinny/Projects/photo-booth-cv/venv/lib/python3.6/site-packages/tornado/websocket.py:1102>> ERROR:asyncio:Task was destroyed but it is pending! task: .wrapper() running at /home/vladinny/Projects/photo-booth-cv/venv/lib/python3.6/site-packages/tornado/websocket.py:1102>>
Don't create new event loops in the executor thread.
write_message
must be called from the same event loop thread that is handling the connection. That means that if you're using an executor, you must pass messages back and forth between the executor and the event loop, so that you can do your blocking tasks on the executor and write your websocket messages on the event loop. Creating a new event loop in the executor will bypass the errors about there not being an event loop in the thread, but it's incorrect - the point of those warnings is that you must use the same event loop, not create a new one.
In practice, I recommend doing as much as possible on the event loop thread and only passing specific pieces of blocking work to the executor:
async def send_frames(self):
while not self.stop_event.is_set():
img = await IOLoop.current().run_in_executor(self.executor, next, self.camera)
if img is not None:
try:
await self.write_message(img, binary=True)
except:
print(traceback.format_exc())