Search code examples
pythonpython-3.xasynchronoustimeoutconcurrent.futures

How to ensure a timeout per each Future in an iterator of concurrent.futures?


The documentation around timeouts for concurrent.futures is very challenging to understand. In a simple case, I'd like to use a ProcessPoolExecutor by calling .submit in a loop that scans through a list of job functions. I want each of these Future objects to have an associated timeout of 10 minutes, but otherwise for them to complete asynchronously.

My first approach was to try to use the as_completed function, which produces an iterator of the Future objects and only yields the next when one has completed. as_completed accepts a timeout parameter but the documentation says this timeout is relative to the first moment when as_completed is called, and not necessarily the lifetime of any Future object itself.

E.g. suppose the ProcessPoolExecutor only has 3 worker processes but the list of Future objects contains 10 items. 7 of the items may sit in an unprocessed state for up to 10 minutes while the first 3 items are processed. Shortly thereafter, the timeout from as_completed will be tripped resulting in a failure, even though each individual Future may have met the 10 minute limit on its own.

Note that the same limitations that apply for as_completed will also apply for wait and wait is harder to use for this use case because of the limited return options it supports.

My next thought was to use the timeout parameter that future.result allows and call f.result(timeout=600) for each f (Future) in my list of futures. However, there is not really a way to set this timeout without actually demanding the result in a blocking way. If you iterate the list of futures and call f.result(...), this call blocks for the specified timeout.

On the other hand, you also cannot combine f.result with as_completed either, in a naive but seemingly correct way like

[f.result(timeout=600) for f in as_completed(futures_list)]

... because the iteration of as_completed is deceptively waiting asynchronously as the futures complete and only returning them to have .result called after they have already completed.

Given this, what is the right pattern to produce a list of Futures where each one has its own individual timeout and then wait on them asynchronously to finish?


Solution

  • It appears there is no way to supply a per-Future timeout in this kind of asynchronous context. The available API functions wait and as_completed take the easier road by supporting a global timeout across all tasks in a iterable of Future objects and do not attempt to measure the time from when the Future first starts to be actively in a state of being processed.

    I chose a workaround of separating my task list into a set of chunks and using as_completed for each chunk. The chunk size is set to be the same as the number of workers that my ProcessPoolExecutor is configured to use, so that I can be somewhat sure the "global" timeout of as_completed is secretly functioning as a per-Future timeout since all of the tasks are actively processed right away. The downside is somewhat less utilization because the process pool is not free to grab the next Future task when tasks finish early; it has to wait for the entire next batch of tasks. For me this is OK but it is a significant usability deficiency of concurrent.futures that I must choose.

    Here is some example code. Suppose that my_task_list already contains functions with some or all required arguments bound via functools.partial or other means. You could modify this so that arguments are supplied in a separate iterable of tuples or dicts and passed into submit as needed.

    my_task_list = #... define your list of task functions
    num_workers = #... set number of workers
    my_timeout = #... define your timeout
    with ProcessPoolExecutor(max_workers=num_workers) as pool:
        all_results = []
        for chunk_start in range(0, len(my_task_list), num_workers):
            chunk = my_task_list[chunk_start:chunk_start + num_workers]
            # could extract parameters to pass for this task chunk here.
            futures = [pool.submit(task) for task in chunk]
            all_results += [
                f.result() for f in as_completed(futures, timeout=my_timeout)
            ]
        return all_results
    

    Note that if you choose num_workers higher than the number of processors available to the ProcessPoolExecutor, you'll end up with more tasks than processors within a given chunk and return to the situation where the timeout of as_completed won't correctly apply to the runtime of each task, likely leading to the same kind of timeout errors as if just using as_completed or wait on the overall task list with no chunking.