The documentation around timeouts for concurrent.futures
is very challenging to understand. In a simple case, I'd like to use a ProcessPoolExecutor
by calling .submit
in a loop that scans through a list of job functions. I want each of these Future
objects to have an associated timeout of 10 minutes, but otherwise for them to complete asynchronously.
My first approach was to try to use the as_completed
function, which produces an iterator of the Future objects and only yields the next when one has completed. as_completed
accepts a timeout
parameter but the documentation says this timeout is relative to the first moment when as_completed
is called, and not necessarily the lifetime of any Future
object itself.
E.g. suppose the ProcessPoolExecutor
only has 3 worker processes but the list of Future
objects contains 10 items. 7 of the items may sit in an unprocessed state for up to 10 minutes while the first 3 items are processed. Shortly thereafter, the timeout from as_completed
will be tripped resulting in a failure, even though each individual Future
may have met the 10 minute limit on its own.
Note that the same limitations that apply for as_completed
will also apply for wait
and wait
is harder to use for this use case because of the limited return options it supports.
My next thought was to use the timeout
parameter that future.result
allows and call f.result(timeout=600)
for each f
(Future) in my list of futures. However, there is not really a way to set this timeout without actually demanding the result in a blocking way. If you iterate the list of futures and call f.result(...)
, this call blocks for the specified timeout.
On the other hand, you also cannot combine f.result
with as_completed
either, in a naive but seemingly correct way like
[f.result(timeout=600) for f in as_completed(futures_list)]
... because the iteration of as_completed
is deceptively waiting asynchronously as the futures complete and only returning them to have .result
called after they have already completed.
Given this, what is the right pattern to produce a list of Future
s where each one has its own individual timeout and then wait on them asynchronously to finish?
It appears there is no way to supply a per-Future timeout in this kind of asynchronous context. The available API functions wait
and as_completed
take the easier road by supporting a global timeout across all tasks in a iterable of Future
objects and do not attempt to measure the time from when the Future
first starts to be actively in a state of being processed.
I chose a workaround of separating my task list into a set of chunks and using as_completed
for each chunk. The chunk size is set to be the same as the number of workers that my ProcessPoolExecutor
is configured to use, so that I can be somewhat sure the "global" timeout of as_completed
is secretly functioning as a per-Future timeout since all of the tasks are actively processed right away. The downside is somewhat less utilization because the process pool is not free to grab the next Future task when tasks finish early; it has to wait for the entire next batch of tasks. For me this is OK but it is a significant usability deficiency of concurrent.futures
that I must choose.
Here is some example code. Suppose that my_task_list
already contains functions with some or all required arguments bound via functools.partial
or other means. You could modify this so that arguments are supplied in a separate iterable of tuples or dicts and passed into submit
as needed.
my_task_list = #... define your list of task functions
num_workers = #... set number of workers
my_timeout = #... define your timeout
with ProcessPoolExecutor(max_workers=num_workers) as pool:
all_results = []
for chunk_start in range(0, len(my_task_list), num_workers):
chunk = my_task_list[chunk_start:chunk_start + num_workers]
# could extract parameters to pass for this task chunk here.
futures = [pool.submit(task) for task in chunk]
all_results += [
f.result() for f in as_completed(futures, timeout=my_timeout)
]
return all_results
Note that if you choose num_workers
higher than the number of processors available to the ProcessPoolExecutor
, you'll end up with more tasks than processors within a given chunk and return to the situation where the timeout of as_completed
won't correctly apply to the runtime of each task, likely leading to the same kind of timeout errors as if just using as_completed
or wait
on the overall task list with no chunking.