With matplotlib backend set to 'PDF', I have incorporated plotting into a function. The central theme of the function is to produce plots, like so:
def func(arg1,arg2,arg3,num):
try:
fig = pdf.savefig(num+arg1+arg2,bbox_inches = "tight")
return fig
except:
return None
I have been able to successfully obtain desired results
(plots) in a serial manner with the following implementation:
data = list(range(100))
results=[func(arg1,arg2,arg3,num) for num in data]
I have attempted to implement this in a parallelized way using both pool.map() and pool.apply_async() methods as follows:
The pool.map() implementation:
if __name__ == "__main__":
try:
pool = Pool()
data = list(range(50))
funct = partial(func,arg1,arg2,arg3)
results = pool.map(funct, data)
finally:
pool.close()
pool.join()
The pool.async() implementation:
if __name__ == "__main__":
try:
pool = Pool()
results = []
data = list(range(50))
result_objects = [pool.apply_async(func, args=(arg1,arg2,arg3,num)) for num in data]
results = [r.get() for r in result_objects]
finally:
pool.close()
pool.join()
With both the parallel implementations, I have noticed that out of the 50 simulations, only six results in a readable pdf file while others are corrupt. When I change the number of simulations to say ten, only three result in a readable pdf file while others are corrupt.
I don't seem to understand why only a few plots are appropriately produced while others are corrupted.
I am carrying out the multiprocessing using a 4 core Linux Ubuntu 18.04 machine.
I have come across the multiprocessing queue()
module which seems to take care of communication between master and child processes. I guess that there is some fault in how the communication is happening currently, and thus resulting in corrupt images for most of the iterations.
Would like help in incorporating multiprocessing queue()
into the code to overcome this problem.
I was under the presumption that Queue() might have some magical abilities to iron out the wrinkles in the code.
Going through several posts, I realized that Lock() had the potential to unlock my misery.
The following modified code works fine:
The pool.map() implementation:
def func(arg1,arg2,arg3,lock,num):
with lock:
#your code#
from multiprocessing import Pool, Lock, Manager
from functools import partial
import time
if __name__ == "__main__":
try:
pool = Pool()
m = Manager()
lock = m.Lock()
data = list(range(1000))
funct = partial(func,arg1,arg2,arg3,lock)
results = pool.map(funct, data)
except:
pool.close()
pool.join()
The pool.async() implementation:
def func(num,arg1,arg2,arg3,lock):
with lock:
# your code #
from multiprocessing import Pool, Lock, Manager
from functools import partial
import time
if __name__ == "__main__":
try:
pool = Pool()
m = Manager()
lock = m.Lock()
results = []
data = list(range(1000))
result_objects = [pool.apply_async(func, args=(num,arg1,arg2,arg3,lock)) for num in data]
results = [r.get() for r in result_objects]
finally:
pool.close()
pool.join()