Search code examples
pythonpython-3.xconcurrencyconcurrent.futures

How to make a Python concurrent Future that signals when a list of Futures has completed?


I have a function that submits several tasks to a ThreadPoolExecutor and return a list of Futures created by each submission:

def submit_tasks() -> [Future]:
    futures = []

    for i in range(10):
        future = executor.submit(Task())
        futures.append(future)

    return futures

def submit() -> Future:
    futures = submit_tasks()
    
    # I would like this function to return a single Future that clients can use to check
    # whether all futures in the list have completed. How to do that?

I'm on Python 3.8

I would like this function to return a single Future that clients can use to check whether all futures in the list have completed. How to do that?


Solution

  • I was in need for the exact same and tried the accepted answer. But it does not what was asked.

    My solution to this problem

    Because the standard library of python does not provide such a feature I created a little helper class that can combine multiple Future objects into one single Future. When it completes can be configured with the complete_when argument.

    The class:

    import concurrent
    from concurrent.futures import Future
    from typing import Any
    
    class CombinedFuture(Future[Future | None]):
        """
        This class provides "waiting" mechanisms similar to concurrent.futures.wait(...) except that there is no blocking wait.
        This class extends concurrent.futures.Future and thus it can be used like any other Future.
        You can use the .result() and .done() (and other) methods and also use this class with the aforementioned concurrent.futures.wait function.
        
        This class is especially useful when you want to combine multiple futures with and (&&) and or (||) logic.
        Example:
        Consider you have multiple parallel tasks (as futures) and a future that will be completed once your function should return (canellation_future).
        You want to wait until all tasks finish normally or the canellation_future is completed.
        With the standard python library this is not possible because concurrent.futures.wait(...) can either wait for all futures or one.
        Using ALL_COMPLETED will never work. And FIRST_COMPLETED would return also if only one task_futures was completed.
        The following code uses CombinedFuture to solve this problem.
        
        .. code-block:: python
            
            def create_task() -> Future:
                # TODO add logic that completes this future
                return Future()
            
            # can be completed any time
            cancellation_future = Future()
            task_futures = [create_task(), create_task()]
            
            task_combined_future = CombinedFuture(*task_futures, complete_when=concurrent.futures.ALL_COMPLETED)
            done, not_done = concurrent.futures.wait([cancellation_future, task_combined_future], timeout=None, return_when=concurrent.futures.ALL_COMPLETED)
            
            if cancellation_future in done:
                print("cancellation_future was completed")
            else:
                print("task_combined_future was completed")
        """
    
        def __init__(self, *futures: Future, complete_when : int = concurrent.futures.FIRST_COMPLETED) -> None:
            self.complete_when = complete_when
            self.futures = set(futures)
            self.completed_futures = set()
            
            super().__init__()
    
            for future in self.futures:
                future.add_done_callback(self._future_completed_callback)
        
        def _set_result_safe(self, result: Any):
            try:
                self.set_result(result)
            except:
                # this might happen when the future had its result already set
                # this can happen when:
                # a second future completes or multiple at "the same time"
                # or the user called set_result or changed the complete_when attribute. both is not supported
                pass
    
        def _future_completed_callback(self, future: Future) -> None:
            self.completed_futures.add(future)
            
            if self.complete_when == concurrent.futures.FIRST_COMPLETED:
                # no count check required because we only need one and we just added our future
                self._set_result_safe(future)
                return
            elif self.complete_when == concurrent.futures.FIRST_EXCEPTION:
                if future.exception(timeout=0) is not None:
                    # future completed with exception
                    self._set_result_safe(future)
            # else: should be concurrent.futures.ALL_COMPLETED
            # but we also want this logic in the FIRST_EXCEPTION case
            if self.completed_futures == self.futures:
                self._set_result_safe(None)
    

    (Code originally posted here: https://gist.github.com/Klotzi111/9ab06b0380702cd5f4044c7529bdc096)

    Usage with the OP's code:

    # add import for CombinedFuture class
    def submit() -> Future:
        futures = submit_tasks()
        return CombinedFuture(*futures, complete_when=concurrent.futures.ALL_COMPLETED)
    

    This works because CombinedFuture is a sub class of Future.

    Why concurrent.futures.wait does not work

    The question explicitly states that it should be a Future that completes when all futures completed. concurrent.futures.wait does not return a Future. It just does a blocking wait until all futures given have completed. It then returns a tuple of two Future lists.
    Usage:

    done, not_done = concurrent.futures.wait([future1, future2, future3])
    # done will containe all 3 futures (because the return_when argument is by default ALL_COMPLETED)
    

    Why it might be needed to get a Future instead of using concurrent.futures.wait

    A Future can be passed on to other functions which then can register done callbacks (via add_done_callback) or can poll the Future for completion (via done()).

    For other reasons see the documentation of the CombinedFuture class.