I am doing some simulation that is parallelizable. For each process, I'm interested in parameters that are stored in dictionaries. For example, I have a dictionary for layers, a dictionary for loss, etc. The good news is that the processes don't need to share any objects between them. All I'm interested in is getting back the result from the processes and combine them all into one big file.
My current attempt is not efficient and a bit uncomfortable to work with.
It is inefficient because I am retrieving the result from the processes through a shared memory object. This means that I am using a mutex to lock the shared memory object every time a process is saving its result into the object. This is fine if all processes need to access the same object, but in my case, I just want the result. So, passing a shared object between processes is an overkill in my case. I just want the process to return the dictionaries when it's done.
It is uncomfortable to work with because every time I need to return some dictionary, I need to create a separate dictionary and adjust the code everywhether for this dictionary. It would be nice to group all the dictionaries into one large dictionary (a dictionary of dictionaries), but for some reason I can share a dictionary object, but cannot share a dictionary of dictionaries.
Here's a snippet of my current solution.
# set up the shared dictionary for the processes
manager = mp.Manager()
layers_dict = manager.dict()
loss_dict = manager.dict()
for k, v in self.output_nodes.items():
# fork the processes
child_processes = []
for process_num in range(N_PROCESSES):
p = Process(target=EqProp(self).train, args=[X, Y, self.batch_size, process_num, N_PROCESSES, layers_dict, loss_dict])
p.start()
child_processes.append(p)
# wait until all processes are done
for process in child_processes:
process.join()
Using multiprocessing.Pool.starmap
you can just return each dict from each process, and combine them into one big dict at the end:
import multiprocessing
# prepare arguments
child_processes_args = []
for k, v in self.output_nodes.items():
for process_num in range(N_PROCESSES):
child_processes_args.append((X, Y, self.batch_size, process_num, N_PROCESSES, layers_dict, loss_dict))
# run execution
pool = multiprocessing.Pool(processes = 3)
results = pool.starmap(EqProp(self).train, child_processes_args)
# get results
results_dict = {}
for result in results:
results_dict[<your-key>] = result