Search code examples
pythonmultithreadingconcurrencythreadpoolconcurrent.futures

How do I wait when all ThreadPoolExecutor threads are busy?


My understanding of how a ThreadPoolExecutor works is that when I call #submit, tasks are assigned to threads until all available threads are busy, at which point the executor puts the tasks in a queue awaiting a thread becoming available.

The behavior I want is to block when there is not a thread available, to wait until one becomes available and then only submit my task.

The background is that my tasks are coming from a queue, and I only want to pull messages off my queue when there are threads available to work on these messages.

In an ideal world, I'd be able to provide an option to #submit to tell it to block if a thread is not available, rather than putting them in a queue.

However, that option does not exist. So what I'm looking at is something like:

    with concurrent.futures.ThreadPoolExecutor(max_workers=CONCURRENCY) as executor:
        while True:
            wait_for_available_thread(executor)
            message = pull_from_queue()
            executor.submit(do_work_for_message, message)

And I'm not sure of the cleanest implementation of wait_for_available_thread.

Honestly, I'm surprised this isn't actually in concurrent.futures, as I would have thought the pattern of pulling from a queue and submitting to a thread pool executor would be relatively common.


Solution

  • One approach might be to keep track of your currently running threads via a set of Futures:

        active_threads = set()
        def pop_future(future):
            active_threads.pop(future)
    
        with concurrent.futures.ThreadPoolExecutor(max_workers=CONCURRENCY) as executor:
            while True:
                while len(active_threads) >= CONCURRENCY:
                    time.sleep(0.1)  # or whatever
                message = pull_from_queue()
                future = executor.submit(do_work_for_message, message)    
                active_threads.add(future)
                future.add_done_callback(pop_future)
    

    A more sophisticated approach might be to have the done_callback be the thing that triggers a queue pull, rather than polling and blocking, but then you need to fall back to polling the queue if the workers manage to get ahead of it.