Search code examples
pythonfuturepython-multithreadingfunctorconcurrent.futures

How to functionally compose futures?


I have a thread object that I can't distribute across a ProcessPoolExecutor, but would like to return a future. If I already have a future, is there a way to apply to its completed value, eg, Future a -> (a -> b) -> Future b?

import concurrent.futures
import threading

def three(x):
    return 2+x


if __name__ == '__main__':
    trackedItem = (3, threading.Event())
    pool = concurrent.futures.ProcessPoolExecutor(3)
    poolJob = (q.submit(three, trackedItem[0]),trackedItem[1]) #(Future(int), Event)
    *** something magic goes here ***
    #Trying to transform it into Future(int,Event)

Solution

  • Here's a way which uses a simpler setup code, without threading.Event as that doesn't seem necessary to solve the problem. Basically, you can create future_b as a new Future() yourself, and use the add_done_callback method on future_a to set the result of future_b. Here, func_a is the computation to compute the result of future_a, and func_b is the computation to compute the result of future_b using the result of future_a.

    from concurrent.futures import ProcessPoolExecutor, Future
    
    def func_a(x):
        return 2 + x
    
    def func_b(x):
        return 10 * x
    
    if __name__ == '__main__':
        pool = ProcessPoolExecutor(3)
        future_a = pool.submit(func_a, 3)
    
        future_b = Future()
        future_b.set_running_or_notify_cancel()
    
        def callback(f):
            x = f.result()
            y = func_b(x)
            future_b.set_result(y)
    
        future_a.add_done_callback(callback)
    
        print(future_b.result()) # 50
    

    If you want a helper function to do this, you can write one: map_future takes a future and a mapping function, and returns the new mapped future as required. This version handles an exception in case f.result() or func_b throws one:

    def map_future(future_a, func):
        future_b = Future()
        future_b.set_running_or_notify_cancel()
    
        def callback(f):
            try:
                x = f.result()
                y = func(x)
                future_b.set_result(y)
            except Exception as e:
                future_b.set_exception(e)
    
        future_a.add_done_callback(callback)
        return future_b
    

    Caveats: this goes against the advice in the documentation for the Future class, which says:

    Future instances are created by Executor.submit() and should not be created directly except for testing.

    Also, if you have any errors which aren't subclasses of Exception in the callback, they will be "logged and ignored" according to the docs. I've chosen to only catch Exception in this code for simplicity, but you might prefer the sys.exc_info()[0] way of catching every possible thing that could be raised.