Search code examples
multithreadingqueuereal-timeflask-socketiobackground-task

queue.get(block=True) while start_background_task (flask-socketio) is running and doing queue.put()


I have an issue related to queue using a background task that never ends (continuously run to grab real-time data.

What I want to achieve:

  1. Starting server via flask-socketio (eventlet),
  2. monkey_patch(),
  3. Using start_background_task, run a function from another file that grabs data in real time,
  4. While this background task is running (indefinitely), storing incoming data in a queue via queue.put(),
  5. Always while this task is running, from the main program watching for new data in the queue and processing them, meaning here socketio.emit().

What works: my program works well if, in the background task file, the while loop ends (while count < 100: for instance). In this case, I can access the queue from the main file and emit data.

What doesn't work: if this while loop is now while True:, the program blocks somewhere, I can't access the queue from the main program as it seems that it waits until the background task returns or stops.

So I guess I'm missing something here... so if you guys can help me with that, or give me some clues, that would be awesome.

Here some relevant parts of the code:

main.py

from threading import Thread
from threading import Lock
from queue import Queue

from get_raw_program import get_raw_data

from flask import Flask, send_from_directory, Response, jsonify, request, abort
from flask_socketio import SocketIO
import eventlet
eventlet.patcher.monkey_patch(select=True, socket=True)

app = Flask(__name__, static_folder=static_folder, static_url_path='')
app.config['SECRET_KEY'] = 'secret_key'

socketio = SocketIO(app, binary=True, async_mode="eventlet", logger=True, engineio_logger=True)
thread = None
thread_lock = Lock()
data_queue = Queue()


[...]

@socketio.on('WebSocket_On')
def grab_raw_data(test):
    global thread

    with thread_lock:
        if thread is None:
            socketio.emit('raw_data', {'msg': 'Thread is None:'})
            socketio.emit('raw_data', {'msg': 'Starting Thread... '})
            thread = socketio.start_background_task(target=get_raw_data(data_queue, test['mode']))

        while True:
            if not data_queue.empty():
                data = data_queue.get(block=True, timeout=0.05)
                socketio.emit('raw_data', {'msg': data})
                socketio.sleep(0.0001)

get_raw_program.py (which works, can access queue from main.py)

def get_raw_data(data_queue, test):
    count = 0
    while count < 100:
        data.put(b'\xe5\xce\x04\x00\xfe\xd2\x04\x00')
        time.sleep(0.001)
        count += 1

get_raw_program.py (which DOESN'T work, can't access queue from main.py)

def get_raw_data(data_queue, test):
    count = 0
    while True:
        data.put(b'\xe5\xce\x04\x00\xfe\xd2\x04\x00')
        time.sleep(0.001)
        count += 1

Solution

  • I tried with regular Thread instead of start_background_task, and it works well. Thanks again for your help, greatly appreciated :-)