My understanding of how a ThreadPoolExecutor
works is that when I call #submit
, tasks are assigned to threads until all available threads are busy, at which point the executor puts the tasks in a queue awaiting a thread becoming available.
The behavior I want is to block when there is not a thread available, to wait until one becomes available and then only submit my task.
The background is that my tasks are coming from a queue, and I only want to pull messages off my queue when there are threads available to work on these messages.
In an ideal world, I'd be able to provide an option to #submit
to tell it to block if a thread is not available, rather than putting them in a queue.
However, that option does not exist. So what I'm looking at is something like:
with concurrent.futures.ThreadPoolExecutor(max_workers=CONCURRENCY) as executor:
while True:
wait_for_available_thread(executor)
message = pull_from_queue()
executor.submit(do_work_for_message, message)
And I'm not sure of the cleanest implementation of wait_for_available_thread
.
Honestly, I'm surprised this isn't actually in concurrent.futures
, as I would have thought the pattern of pulling from a queue and submitting to a thread pool executor would be relatively common.
One approach might be to keep track of your currently running threads via a set
of Futures:
active_threads = set()
def pop_future(future):
active_threads.pop(future)
with concurrent.futures.ThreadPoolExecutor(max_workers=CONCURRENCY) as executor:
while True:
while len(active_threads) >= CONCURRENCY:
time.sleep(0.1) # or whatever
message = pull_from_queue()
future = executor.submit(do_work_for_message, message)
active_threads.add(future)
future.add_done_callback(pop_future)
A more sophisticated approach might be to have the done_callback
be the thing that triggers a queue pull, rather than polling and blocking, but then you need to fall back to polling the queue if the workers manage to get ahead of it.