I have a Raspberry Pi, using Wifi, it is running a people counting model, and will send the processed image to my server using a ZeroMQ socket. On this server I build a web server, using Flask. I got an error that is not shown during the streaming for the first person who accessed the website, but the next access following the first one will fail:
zmq.error.ZMQError: Address in use
What should i do to more people access my server and can see video streaming?
The server code :
from flask import Flask, render_template, Response, request, jsonify
from flask_restful import Api
import numpy as np
from api.configs import configs
from flask_cors import CORS
import zmq
import cv2
import base64
app = Flask(__name__, static_url_path='/static')
api_restful = Api(app)
cors = CORS(app)
def gen():
context = zmq.Context()
footage_socket = context.socket(zmq.SUB)
footage_socket.bind('tcp://*:{}'.format(configs.SOCKET_PORT))
footage_socket.setsockopt_string(zmq.SUBSCRIBE, np.unicode(''))
while True:
frame = footage_socket.recv()
npimg = np.fromstring(frame, dtype=np.uint8)
source = cv2.imdecode(npimg, 1)
s = cv2.imencode('.jpg', source)[1].tobytes()
yield ( b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + s + b'\r\n' )
@app.route('/video_feed')
def video_feed():
return Response( gen(),
mimetype = 'multipart/x-mixed-replace; boundary=frame'
)
if __name__ == '__main__':
app.run( debug = True,
threaded = True,
port = configs.PORT,
host = configs.HOST
)
The Raspberry Pi client code :
import socket
import logging as log
import numpy as np
from api.configs import configs
import zmq
import numpy as np
import cv2
context = zmq.Context()
footage_socket = context.socket(zmq.PUB)
footage_socket.connect( 'tcp://{}:{}'.format( configs.SERVER_HOST,
configs.SOCKET_PORT )
)
time.sleep(1)
cap = cv2.VideoCapture(0)
while True:
while cap.isOpened():
ret, frame = cap.read()
frame = cv2.resize(frame, (640, 480))
encoded, buffer_ = cv2.imencode('.jpg', frame)
footage_socket.send(buffer_)
I omitted some code to make it easier to see
I cannot speak about the details hidden inside the Flask()
web-served video re-wrapping/delivery, yet the ZeroMQ part seems to be the trouble, so :
a )
it seems that every web-watcher spawns another @app.route( '/video_feed' )
-decorated Flask()
-instance ( check the mode - if just a thread-based, or a process-based - it is important, whether a central Context()
instance may or cannot get shared and efficiently re-used ).
b )
the code, residing inside there hooked Response()
-feeder, attempts to .bind()
for each subsequent Flask()
-served user, which for obvious reasons collides, as the resource ( the address:port
) has been POSACK
'ed for exclusive use for the first-came, first served visitor ( any next one must crash and crashed as documented above ).
For indeed small scale use-cases, it shall be enough to reverse the .bind()
/.connect()
-s, i.e. the PUB
will .bind()
( as it does not replicate itself and can serve any small counts of further SUB
(s), arriving via .connect()
-s )
However, this quick-fix is a bit dirty. This should never be done in a production-grade system. The workload envelope is soon to grow above reasonable levels. The proper architecture should work in a different manner.
instantiate a central zmq.Context( configs.nIOthreads )
-instance ( could be shared to re-use in called methods ), having thus also a central scaling of the nIOthreads
for performance reasons.
instantiate a one, central SUB
-instance, that communicates with Rpi and collects video-frames, to avoid any re-duplications of image/serving
configure both sides to use .setsockopt( zmq.CONFLATE )
to principally avoid any re-broadcasting of a video "History of No Value"
instantiate all the @app
-decorated Flask()
-tools to re-use the central Context()
-instance and to internally connect to a central video-re-PUB
-lisher, using a SUB
-side of another PUB/SUB
archetype, here, using an efficient memory-mapped inproc://
, again using .setsockopt( zmq.CONFLATE )
configure all resources to have higher robustness - at least with an explicit .setsockopt( zmq.LINGER, 0 )
detect and handle all error-states, as the ZeroMQ API documents all of them well enough to help you diagnose + manage any failed state professionally and safely