I'm studying python, here I got some problem on the concurrent.futures.wait() -- Here's the details-- I want to make the main process hold until all the child processes completed. So I used wait() to block the main process. But I always got error , please kind help.
def child_process(args):
pid=os.getpid();
while (args.len() > 0 ):
task=args.pop(0)
time.sleep(1+ random.random()*5) #simulate the worker time
print("Process "+str(pid)+" : "+task[0]+" "+task[1])
return
if (__name__ == "__main__") :
mgr = multiprocessing.Manager()
tasks=mgr.list()
tasks=[[1,10],[2,20],[3,30],[4,40],[5,50],[6,60]]
#executor=ProcessPoolExecutor(max_workers=3)
f=[]
with concurrent.futures.ProcessPoolExecutor(max_workers=3) as executor:
f.append(executor.submit(child_process,tasks))
f.append(executor.submit(child_process,tasks))
f.append(executor.submit(child_process,tasks))
# wait(future,timeout=0,return_when=concurrent.futures.ALL_COMPLETED)
concurrent.futures.wait(f[0])
concurrent.futures.wait(f[1])
concurrent.futures.wait(f[2])
executor.shutdown()
The error is --
C:\Work\python\src\test>python test.py
Traceback (most recent call last):
File "C:\Work\python\src\test\test.py", line 70, in <module>
concurrent.futures.wait(f[0])
File "C:\tools\Python310\lib\concurrent\futures\_base.py", line 290, in wait
fs = set(fs)
TypeError: 'Future' object is not iterable
This puzzles me most -- is that f[0] not a future object returned by submit()?
Then I tried with --
wait(f,timeout=0,return_when=concurrent.futures.ALL_COMPLETED)
the new error is --
C:\Work\python\src\test>python test.py
C:\Work\python\src\test\test.py:68: RuntimeWarning: coroutine 'wait' was never awaited
wait(f,timeout=0,return_when=concurrent.futures.ALL_COMPLETED)
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
I really don't know how to fix it. Please kidn advise. Thanks
Regards Eisen
Few things to point out:
while
statement is redundant.>>> a = 0
>>> while a < 10:
... a += 1
f[0]
you passed is indeed Future Object
which is not wait method was expecting.>>> from concurrent import futures
>>> help(futures.wait)
Help on function wait in module concurrent.futures._base:
wait(fs, timeout=None, return_when='ALL_COMPLETED')
Wait for the futures in the given sequence to complete.
Args:
fs: The sequence of Futures (possibly created by different Executors) to
wait upon.
# ...
Here we can see argument fs
actually expect you the Sequence
of Futures
.
So instead of this:
concurrent.futures.wait(f[0])
concurrent.futures.wait(f[1])
concurrent.futures.wait(f[2])
You probably want this:
concurrent.futures.wait(f)
Which is still not required since with
block wait until all processes stops.
Here's demonstration:
"""
Demo codes for https://stackoverflow.com/q/71458088/10909029
Waiting for child process to complete
"""
import os
import math
import queue
import multiprocessing as mp
from concurrent import futures
def child_process(task_queue: mp.Queue):
# If this doesn't work, save this function in other file. REPL or jupyter especially.
pid = os.getpid()
print(f"[{pid}] Started!")
processed_count = 0
while True:
try:
item = task_queue.get_nowait()
except queue.Empty:
# task done
break
# else continue on
# some workload
try:
print(f"[{pid}] {item}! = {math.factorial(item)}")
finally:
# tell queue we processed the item.
task_queue.task_done()
processed_count += 1
print(f"[{pid}] Task done!")
def main():
# just merely rapping codes in function namespace makes codes tiny bit faster
mp_manager = mp.Manager()
task_queue = mp_manager.Queue()
# populate queue
for n in range(100):
task_queue.put_nowait(n)
# start pool
with futures.ProcessPoolExecutor() as executor:
future_list = [executor.submit(child_process, task_queue) for _ in range(5)]
# can use executor.shutdown(wait=True) instead
# not required since all executor wait for all process to stop when exiting `with` block.
# hence, also no need to manually call executor.shutdown().
futures.wait(future_list)
if __name__ == '__main__':
main()
Output:
[18412] Started!
[18412] 0! = 1
[4680] Started!
[18412] 1! = 1
[2664] Started!
[18412] 2! = 2
[18412] 3! = 6
[17900] Started!
[18412] 4! = 24
[18412] 5! = 120
[4680] 6! = 720
[4680] 7! = 5040
[18412] 8! = 40320
[17900] 9! = 362880
[4680] 10! = 3628800
[18412] 11! = 39916800
...
[17900] 21! = 51090942171709440000
[4680] 22! = 1124000727777607680000
[2664] 23! = 25852016738884976640000
[16792] Started!
[18412] 24! = 620448401733239439360000
[17900] 25! = 15511210043330985984000000
...
[17900] 99! = 933262154439441526816992388562667004907159682643816214685929638952175999932299156089414639761565182862536979208272237582511852109168640000000000000000000000
[18412] Task done!
[17900] Task done!
[16792] Task done!
[2664] Task done!
[4680] Task done!