I'm using multiprocessing pool in Python and its .apply_async()
method to run several workers as concurrent.
But there is a problem due to using with
instead of creating an instance arbitrary.
Here's what I've done so far:
from multiprocessing import Pool
from concurrent.futures import ProcessPoolExecutor
from time import sleep, time
def worker(x):
print(f"{x} started.")
sleep(x)
print(f"{x} finished.")
return f"{x} finished."
result_list = []
def log_result(result):
result_list.append(result)
tick = time()
pool = Pool()
for i in range(6):
pool.apply_async(worker, args=(i, ), callback=log_result)
pool.close()
pool.join()
print('Total elapsed time: ', time() - tick)
print(result_list)
print(i) # Indicates that all iteration has been done.
Out:
1 started.
2 started.
0 started.
0 finished.
3 started.
4 started.
1 finished.
5 started.
2 finished.
3 finished.
4 finished.
5 finished.
Total elapsed time: 6.022687673568726
['0 finished.', '1 finished.', '2 finished.', '3 finished.', '4 finished.', '5 finished.']
5
tick = time()
with ProcessPoolExecutor() as executor:
for i in range(6):
executor.submit(worker, i)
print('Total elapsed time: ', time() - tick)
print(i) # Indicates that all iteration has been done.
Out:
0 started.
0 finished.
1 started.
2 started.
3 started.
4 started.
1 finished.
5 started.
2 finished.
3 finished.
4 finished.
5 finished.
Total elapsed time: 6.017550945281982
5
Extra:
Now the problem is here which I want to implement the Python 2 manner using with
such as Python 3 method, but tasks not completed:
tick = time()
with Pool() as pool:
for i in range(6):
pool.apply_async(worker, args=(i,), callback=log_result)
print('Total elapsed time: ', time() - tick)
print(result_list)
print(i) # Indicates that all iteration has been done.
Out:
Total elapsed time: 0.10628008842468262
[]
5
However, If I placed a sleep(1)
after pool.apply_async(...)
some lite tasks will be finished (establishing a block):
tick = time()
with Pool() as pool:
for i in range(6):
pool.apply_async(worker, args=(i,), callback=log_result)
sleep(1)
print('Total elapsed time: ', time() - tick)
print(result_list)
print(i) # Indicates that all iteration has been done.
Out:
0 started.
0 finished.
1 started.
2 started.
1 finished.
3 started.
4 started.
2 finished.
5 started.
3 finished.
Total elapsed time: 6.022568702697754
['0 finished.', '1 finished.', '2 finished.', '3 finished.']
5
What I missed?
concurrent.futures.Executor
and multiprocessing.Pool
have two completely different context manager implementations.
concurrent.futures.Executor
calls shutdown(wait=True)
effectively waiting for all enqueued jobs to finish as per the documentation.
You can avoid having to call this method explicitly if you use the with statement, which will shutdown the Executor (waiting as if Executor.shutdown() were called with wait set to True)
multiprocessing.Pool
calls terminate
instead of close
and then join
which leads to premature interruption of all ongoing jobs. In the documentation.
Pool objects now support the context management protocol – see Context Manager Types. enter() returns the pool object, and exit() calls terminate().
If you want to use multiprocessing.Pool
together with its context manager, you need to wait for the results yourself.
with Pool() as pool:
async_result = pool.apply_async(worker, args=(i,), callback=log_result)
async_result.wait()