I have a function that submits several tasks to a ThreadPoolExecutor
and return a list of Futures created by each submission:
def submit_tasks() -> [Future]:
futures = []
for i in range(10):
future = executor.submit(Task())
futures.append(future)
return futures
def submit() -> Future:
futures = submit_tasks()
# I would like this function to return a single Future that clients can use to check
# whether all futures in the list have completed. How to do that?
I'm on Python 3.8
I would like this function to return a single Future that clients can use to check whether all futures in the list have completed. How to do that?
I was in need for the exact same and tried the accepted answer. But it does not what was asked.
Because the standard library of python does not provide such a feature I created a little helper class that can combine multiple Future
objects into one single Future
. When it completes can be configured with the complete_when
argument.
The class:
import concurrent
from concurrent.futures import Future
from typing import Any
class CombinedFuture(Future[Future | None]):
"""
This class provides "waiting" mechanisms similar to concurrent.futures.wait(...) except that there is no blocking wait.
This class extends concurrent.futures.Future and thus it can be used like any other Future.
You can use the .result() and .done() (and other) methods and also use this class with the aforementioned concurrent.futures.wait function.
This class is especially useful when you want to combine multiple futures with and (&&) and or (||) logic.
Example:
Consider you have multiple parallel tasks (as futures) and a future that will be completed once your function should return (canellation_future).
You want to wait until all tasks finish normally or the canellation_future is completed.
With the standard python library this is not possible because concurrent.futures.wait(...) can either wait for all futures or one.
Using ALL_COMPLETED will never work. And FIRST_COMPLETED would return also if only one task_futures was completed.
The following code uses CombinedFuture to solve this problem.
.. code-block:: python
def create_task() -> Future:
# TODO add logic that completes this future
return Future()
# can be completed any time
cancellation_future = Future()
task_futures = [create_task(), create_task()]
task_combined_future = CombinedFuture(*task_futures, complete_when=concurrent.futures.ALL_COMPLETED)
done, not_done = concurrent.futures.wait([cancellation_future, task_combined_future], timeout=None, return_when=concurrent.futures.ALL_COMPLETED)
if cancellation_future in done:
print("cancellation_future was completed")
else:
print("task_combined_future was completed")
"""
def __init__(self, *futures: Future, complete_when : int = concurrent.futures.FIRST_COMPLETED) -> None:
self.complete_when = complete_when
self.futures = set(futures)
self.completed_futures = set()
super().__init__()
for future in self.futures:
future.add_done_callback(self._future_completed_callback)
def _set_result_safe(self, result: Any):
try:
self.set_result(result)
except:
# this might happen when the future had its result already set
# this can happen when:
# a second future completes or multiple at "the same time"
# or the user called set_result or changed the complete_when attribute. both is not supported
pass
def _future_completed_callback(self, future: Future) -> None:
self.completed_futures.add(future)
if self.complete_when == concurrent.futures.FIRST_COMPLETED:
# no count check required because we only need one and we just added our future
self._set_result_safe(future)
return
elif self.complete_when == concurrent.futures.FIRST_EXCEPTION:
if future.exception(timeout=0) is not None:
# future completed with exception
self._set_result_safe(future)
# else: should be concurrent.futures.ALL_COMPLETED
# but we also want this logic in the FIRST_EXCEPTION case
if self.completed_futures == self.futures:
self._set_result_safe(None)
(Code originally posted here: https://gist.github.com/Klotzi111/9ab06b0380702cd5f4044c7529bdc096)
Usage with the OP's code:
# add import for CombinedFuture class
def submit() -> Future:
futures = submit_tasks()
return CombinedFuture(*futures, complete_when=concurrent.futures.ALL_COMPLETED)
This works because CombinedFuture
is a sub class of Future
.
concurrent.futures.wait
does not workThe question explicitly states that it should be a Future
that completes when all futures completed.
concurrent.futures.wait
does not return a Future
. It just does a blocking wait until all futures given have completed.
It then returns a tuple of two Future lists.
Usage:
done, not_done = concurrent.futures.wait([future1, future2, future3])
# done will containe all 3 futures (because the return_when argument is by default ALL_COMPLETED)
Future
instead of using concurrent.futures.wait
A Future
can be passed on to other functions which then can register done callbacks (via add_done_callback
) or can poll the Future
for completion (via done()
).
For other reasons see the documentation of the CombinedFuture
class.