Having build a significant part of my code on dill serialization/pickling, I'm also trying to use pathos multiprocessing to parallelize my calculations. Pathos it is a natural extension of dill.
When trying to run nested
from pathos.multiprocessing import ProcessingPool
ProcessingPool().map(fn, args)
inside an other ProcessingPool().map
, then I receive:
AssertionError: daemonic processes are not allowed to have children
E.g.:
from pathos.multiprocessing import ProcessingPool
def triple(x):
return 3*x
def refork(x):
from pathos.multiprocessing import ProcessingPool
return ProcessingPool().map(triple, xrange(5))
ProcessingPool().map(refork, xrange(3))
yields
AssertionError: daemonic processes are not allowed to have children
I tried using amap(...).get()
without success. This is on pathos 0.2.0.
What is the best way to allow for nested parallelization?
Update
I have to be honest at this point, and confess that I have removed the assertion "daemonic processes are not allowed to have children"
from pathos. I also built something which cascades KeyboardInterrupt
to workers and workers of those... Parts of the solution below:
def run_parallel(exec_func, exec_args, num_workers_i)
pool = ProcessingPool(num_workers_i)
pool.restart(force=True)
pid_is = pool.map(get_pid_i, xrange(num_workers_i))
try:
results = pool.amap(
exec_func,
exec_args,
)
counter_i = 0
while not results.ready():
sleep(2)
if counter_i % 60 == 0:
print('Waiting for children running in pool.amap() with PIDs: {}'.format(pid_is))
counter_i += 1
results = results.get()
pool.close()
pool.join()
except KeyboardInterrupt:
print('Ctrl+C received, attempting to terminate pool...')
hard_kill_pool(pid_is, pool) # sending Ctrl+C
raise
except:
print('Attempting to close parallel after exception: {}'.format(sys.exc_info()[0]))
cls.hard_kill_pool(pid_is, pool) # sending Ctrl+C
raise
def hard_kill_pool(pid_is, pool):
for pid_i in pid_is:
os.kill(pid_i, signal.SIGINT) # sending Ctrl+C
pool.terminate()
Seems to work from console and IPython notebook (with stop button), but not sure it's 100% correct in all corner cases.
I encountered exactly the same issue. In my case, The inner operation was the one that needed parallelism so I did a ThreadingPool
of a ProcessingPool
. Here it is with your example:
from pathos.multiprocessing import ProcessingPool, ThreadingPool
def triple(x):
return 3*x
def refork(x):
from pathos.multiprocessing import ProcessingPool
return ProcessingPool().map(triple, xrange(5))
ThreadingPool().map(refork, xrange(3))
You can even have another layer with another outer threading pool. Depending on your case, you can invert the order of these pools. However, you cannot have processes of processes. If really needed, see: https://stackoverflow.com/a/8963618/6522112. I haven't try it yet myself so I can't elaborate on this.