I'm trying to parallelize a piece of code given below using the multiprocessing module. Everything I try leads to each child process being run one after the other even though they all have different PIDs. I have tried:
I can't figure out what I am doing wrong.
fs.py:
import numpy as np
from time import sleep
import os
def f(r):
res = np.arange(r[0], r[1])
print(f'I am {os.getpid()}')
sleep(10)
print(f'I am {os.getpid()} and I am finished')
return {'nums': res, 'dubs': res * 2}
playground.py:
import multiprocessing as mp
import numpy as np
from fs import f
if __name__ == '__main__':
ctx = mp.get_context('spawn')
p = ctx.Pool(4)
with p:
subsets = [[0, 3], [3, 6], [6, 7]]
res = [p.apply(f, (subset, )) for subset in subsets]
print(res)
print('Done!')
Command: python playground.py
Output:
I am 29881
I am 29881 and I am finished
I am 29882
I am 29882 and I am finished
I am 29881
I am 29881 and I am finished
[{'nums': array([0, 1, 2]), 'dubs': array([0, 2, 4])}, {'nums': array([3, 4, 5]),
'dubs': array([ 6, 8, 10])}, {'nums': array([6]), 'dubs': array([12])}]
Done!
When I use p.map()
like this (on Linux Mint)
res = p.map(f, subsets)
then I get
I am 1337328
I am 1337325
I am 1337327
I am 1337328 and I am finished
I am 1337325 and I am finished
I am 1337327 and I am finished
Maybe you used map()
in wrong way. res = [p.map(f, (subset, )) for subset in subsets]
if __name__ == '__main__':
ctx = mp.get_context('spawn')
with ctx.Pool(4) as p:
subsets = [[0, 3], [3, 6], [6, 7]]
res = p.map(f, subsets)
print(res)
print('Done!')
For apply_async
you would need two for
-loops
items = [p.apply_async(f, (subset, )) for subset in subsets]
res = [x.get() for x in items]
print(res)
And both have to be inside with p:
if __name__ == '__main__':
ctx = mp.get_context('spawn')
with ctx.Pool(4) as p:
subsets = [[0, 3], [3, 6], [6, 7]]
items = [p.apply_async(f, (subset, )) for subset in subsets]
print(items)
res = [x.get() for x in items]
print(res)
print('Done!')