Search code examples
pythonconcurrent.futures

How to use concurrent.future.wait?


I'm studying python, here I got some problem on the concurrent.futures.wait() -- Here's the details-- I want to make the main process hold until all the child processes completed. So I used wait() to block the main process. But I always got error , please kind help.

def child_process(args):
    pid=os.getpid();
    while (args.len() > 0 ):
        task=args.pop(0)
        time.sleep(1+ random.random()*5)   #simulate the worker time
    print("Process "+str(pid)+" : "+task[0]+"  "+task[1])
    return


if (__name__  ==  "__main__") :

   mgr = multiprocessing.Manager()
   tasks=mgr.list()
   tasks=[[1,10],[2,20],[3,30],[4,40],[5,50],[6,60]]

    
    #executor=ProcessPoolExecutor(max_workers=3)
    f=[]
    with concurrent.futures.ProcessPoolExecutor(max_workers=3) as executor:
        f.append(executor.submit(child_process,tasks))
        f.append(executor.submit(child_process,tasks))
        f.append(executor.submit(child_process,tasks))
                
#        wait(future,timeout=0,return_when=concurrent.futures.ALL_COMPLETED)

        concurrent.futures.wait(f[0])
        concurrent.futures.wait(f[1])
        concurrent.futures.wait(f[2])
        
        executor.shutdown()

The error is --

C:\Work\python\src\test>python test.py
Traceback (most recent call last):
  File "C:\Work\python\src\test\test.py", line 70, in <module>
    concurrent.futures.wait(f[0])
  File "C:\tools\Python310\lib\concurrent\futures\_base.py", line 290, in wait
    fs = set(fs)
TypeError: 'Future' object is not iterable

This puzzles me most -- is that f[0] not a future object returned by submit()?

Then I tried with --

wait(f,timeout=0,return_when=concurrent.futures.ALL_COMPLETED)

the new error is --

C:\Work\python\src\test>python test.py
C:\Work\python\src\test\test.py:68: RuntimeWarning: coroutine 'wait' was never awaited
  wait(f,timeout=0,return_when=concurrent.futures.ALL_COMPLETED)
RuntimeWarning: Enable tracemalloc to get the object allocation traceback

I really don't know how to fix it. Please kidn advise. Thanks

Regards Eisen


Solution

  • Few things to point out:

    • Wrapping expression with parenthesis in while statement is redundant.
    >>> a = 0
    >>> while a < 10:
    ...     a += 1
    

    • Error message is saying "Future object is not iterable" - which means, f[0] you passed is indeed Future Object which is not wait method was expecting.
    >>> from concurrent import futures
    >>> help(futures.wait)
    Help on function wait in module concurrent.futures._base:
    
    wait(fs, timeout=None, return_when='ALL_COMPLETED')
        Wait for the futures in the given sequence to complete.
    
        Args:
            fs: The sequence of Futures (possibly created by different Executors) to
                wait upon.
    # ...
    

    Here we can see argument fs actually expect you the Sequence of Futures.

    So instead of this:

    concurrent.futures.wait(f[0])
    concurrent.futures.wait(f[1])
    concurrent.futures.wait(f[2])
    

    You probably want this:

    concurrent.futures.wait(f)
    

    Which is still not required since with block wait until all processes stops.


    Here's demonstration:

    """
    Demo codes for https://stackoverflow.com/q/71458088/10909029
    
    Waiting for child process to complete
    """
    
    import os
    import math
    import queue
    import multiprocessing as mp
    from concurrent import futures
    
    
    def child_process(task_queue: mp.Queue):
        # If this doesn't work, save this function in other file. REPL or jupyter especially.
        pid = os.getpid()
    
        print(f"[{pid}] Started!")
        processed_count = 0
    
        while True:
            try:
                item = task_queue.get_nowait()
            except queue.Empty:
                # task done
                break
    
            # else continue on
            # some workload
            try:
                print(f"[{pid}] {item}! = {math.factorial(item)}")
    
            finally:
                # tell queue we processed the item.
                task_queue.task_done()
                processed_count += 1
    
        print(f"[{pid}] Task done!")
    
    
    def main():
        # just merely rapping codes in function namespace makes codes tiny bit faster
    
        mp_manager = mp.Manager()
        task_queue = mp_manager.Queue()
    
        # populate queue
        for n in range(100):
            task_queue.put_nowait(n)
    
        # start pool
        with futures.ProcessPoolExecutor() as executor:
            future_list = [executor.submit(child_process, task_queue) for _ in range(5)]
    
            # can use executor.shutdown(wait=True) instead
            # not required since all executor wait for all process to stop when exiting `with` block.
            # hence, also no need to manually call executor.shutdown().
            futures.wait(future_list)
    
    
    if __name__ == '__main__':
        main()
    

    Output:

    [18412] Started!
    [18412] 0! = 1
    [4680] Started!
    [18412] 1! = 1
    [2664] Started!
    [18412] 2! = 2
    [18412] 3! = 6
    [17900] Started!
    [18412] 4! = 24
    [18412] 5! = 120
    [4680] 6! = 720
    [4680] 7! = 5040
    [18412] 8! = 40320
    [17900] 9! = 362880
    [4680] 10! = 3628800
    [18412] 11! = 39916800
    
    ...
    
    [17900] 21! = 51090942171709440000
    [4680] 22! = 1124000727777607680000
    [2664] 23! = 25852016738884976640000
    [16792] Started!
    [18412] 24! = 620448401733239439360000
    [17900] 25! = 15511210043330985984000000
    
    ...
    
    [17900] 99! = 933262154439441526816992388562667004907159682643816214685929638952175999932299156089414639761565182862536979208272237582511852109168640000000000000000000000
    [18412] Task done!
    [17900] Task done!
    [16792] Task done!
    [2664] Task done!
    [4680] Task done!