Search code examples
flaskraspberry-pistreamingzeromqpyzmq

Streaming video from Raspberry Pi on web server to multiple users


I have a Raspberry Pi, using Wifi, it is running a people counting model, and will send the processed image to my server using a ZeroMQ socket. On this server I build a web server, using Flask. I got an error that is not shown during the streaming for the first person who accessed the website, but the next access following the first one will fail:

zmq.error.ZMQError: Address in use

What should i do to more people access my server and can see video streaming?

The server code :

from flask         import Flask, render_template, Response, request, jsonify
from flask_restful import Api
import numpy as np
from api.configs   import configs
from flask_cors    import CORS
import zmq
import cv2
import base64

app = Flask(__name__, static_url_path='/static')
api_restful = Api(app)
cors = CORS(app)

def gen():
    context = zmq.Context()
    footage_socket = context.socket(zmq.SUB)
    footage_socket.bind('tcp://*:{}'.format(configs.SOCKET_PORT))
    footage_socket.setsockopt_string(zmq.SUBSCRIBE, np.unicode(''))
    while True:
        frame = footage_socket.recv()
        npimg = np.fromstring(frame, dtype=np.uint8)
        source = cv2.imdecode(npimg, 1)
        s = cv2.imencode('.jpg', source)[1].tobytes()
        yield ( b'--frame\r\n'
                b'Content-Type: image/jpeg\r\n\r\n' + s + b'\r\n' )

@app.route('/video_feed')
def video_feed():
    return Response( gen(),
                     mimetype = 'multipart/x-mixed-replace; boundary=frame'
                     )

if __name__ == '__main__':
    app.run( debug    = True,
             threaded = True,
             port     = configs.PORT,
             host     = configs.HOST
             )

The Raspberry Pi client code :

import socket
import logging as log
import numpy as np
from api.configs import configs
import zmq
import numpy as np
import cv2

context = zmq.Context()
footage_socket = context.socket(zmq.PUB)
footage_socket.connect( 'tcp://{}:{}'.format( configs.SERVER_HOST,
                                              configs.SOCKET_PORT )
                         )
time.sleep(1)
cap = cv2.VideoCapture(0)
while True:
    while cap.isOpened():
        ret, frame = cap.read()
        frame = cv2.resize(frame, (640, 480))
        encoded, buffer_ = cv2.imencode('.jpg', frame)
        footage_socket.send(buffer_)

I omitted some code to make it easier to see


Solution

  • I cannot speak about the details hidden inside the Flask() web-served video re-wrapping/delivery, yet the ZeroMQ part seems to be the trouble, so :

    a )
    it seems that every web-watcher spawns another @app.route( '/video_feed' )-decorated Flask()-instance ( check the mode - if just a thread-based, or a process-based - it is important, whether a central Context() instance may or cannot get shared and efficiently re-used ).

    b )
    the code, residing inside there hooked Response()-feeder, attempts to .bind() for each subsequent Flask()-served user, which for obvious reasons collides, as the resource ( the address:port ) has been POSACK'ed for exclusive use for the first-came, first served visitor ( any next one must crash and crashed as documented above ).


    PROFESSIONAL SOLUTION ( or just a quick-but-dirty-fix ? ):

    For indeed small scale use-cases, it shall be enough to reverse the .bind()/.connect()-s, i.e. the PUB will .bind() ( as it does not replicate itself and can serve any small counts of further SUB(s), arriving via .connect()-s )

    However, this quick-fix is a bit dirty. This should never be done in a production-grade system. The workload envelope is soon to grow above reasonable levels. The proper architecture should work in a different manner.

    • instantiate a central zmq.Context( configs.nIOthreads )-instance ( could be shared to re-use in called methods ), having thus also a central scaling of the nIOthreads for performance reasons.

    • instantiate a one, central SUB-instance, that communicates with Rpi and collects video-frames, to avoid any re-duplications of image/serving

    • configure both sides to use .setsockopt( zmq.CONFLATE ) to principally avoid any re-broadcasting of a video "History of No Value"

    • instantiate all the @app-decorated Flask()-tools to re-use the central Context()-instance and to internally connect to a central video-re-PUB-lisher, using a SUB-side of another PUB/SUB archetype, here, using an efficient memory-mapped inproc://, again using .setsockopt( zmq.CONFLATE )

    • configure all resources to have higher robustness - at least with an explicit .setsockopt( zmq.LINGER, 0 )

    • detect and handle all error-states, as the ZeroMQ API documents all of them well enough to help you diagnose + manage any failed state professionally and safely