Search code examples
pythonmultithreadingflaskqueuepython-multithreading

Threading Tasks Queue in Flask


Hey guys so I'm building a websocket using flask and I'm currently creating a new thread for every request so that thread can do some heavy processing and not take too much time to return something from the request. The problem is that at some point i get so many threads open that it starts causing problems and i was wondering if i could setup some queue in flask to limit the app for creating like 8 thread each time only.

My code:

def process_audio(bucket_name, key, _id, extension):
    S3_CLIENT = boto3.client('s3', region_name=S3_REGION)
    print('Running audio proccessing')
    INPUT_FILE = os.path.join(TEMP_PATH, f'{_id}.{extension}')
    print(f'Saving downloaded file to {INPUT_FILE}')
    S3_CLIENT.download_file(bucket_name, key, INPUT_FILE)
    print('File downloaded')
    process = stt.process_audio(INPUT_FILE)
    print(f'Audio processed by AI returned: "{process}"')
    stt.reset()
    ai = get_sentimentAI_results(process)
    if ai:
        print(f'Text processed by AI returned class {ai[0]} with a certainty of {ai[1]}%')
        return True

    print('Request to sentiment AI endpoint failed for an unkown reason. Check CloudWhatch for more information!')
    return False

@app.route('/process/audio', methods=['POST'])
def process_new_audio():
    print('Recieving new request')
    data = request.data

    if not data:
        return '', 404

    data = json.loads(data)
    bucket_name = data.get('bucket_name')
    key = data.get('key')
    _id = data.get('id')
    extension = data.get('file_extension')

    if not key or not bucket_name or not _id or not extension:
        return '', 404

    thread = Thread(target=process_audio, kwargs={'bucket_name': bucket_name, 'key': key, '_id': _id, 'extension': extension})
    thread.start()
    
    return '', 200

Resume of the problem:

This flask function works as a webhook triggered by AWS lambda that creates a thread to process the data without making the lambda wait for it to finish. I just need a way to create a queue so in case 100 requests are made I don't have 100 threads running but only 5 at each time for example


Solution

  • This is a pretty common problem when multi-threading, the solution you need here is called the producer-consumer model where there is a producer (the entity that creates work) then there is a (threadsafe) queue where this work is pushed into, then there are consumers (worker threads) that one by one pop out work from the queue until the queue is empty.

    Doing this limits the number of worker threads. One neat way to do this is to use the concurrent.futures library available in python. @aaron as given an appropriate link for that.