I have a big list of images list_img
, say 20k that I need to process multiple times with changing arguments out of a list params = [arg1, arg2, ...]
. Ideally, I want to use multiple processes to do so. But I need all processes to first use arg1
and then arg2
on chunks of my list list_img
. The processing time for each arg
in params
varies greatly. So if I would distribute the list params
over my processes instead of the list of images (core 1: arg1, core 2: arg2, ...) it happens that after a while most of the processes are idle (finished) while very few are still crunching data.
My current (working) solution looks like that:
from multiprocessing import Pool
import numpy as np
def calc_image(argument, image):
val = argument * image # not the real process, just demo
return val
if __name__ == "__main__":
pool = Pool(processes=8)
list_img = [np.ones((100, 100))] * 20000 # for demo only
params = list(range(100)) # for demo only
for par in params:
par_list = [par] * len(list_img)
return_vals = pool.starmap(calc_image, zip(par_list, list_img))
pool.close()
How can I avoid to copy the list list_img
every time the variable par
changes in the for-loop? I also would like to avoid using global variables, if possible.
This is my current workaround for the problem. I'm still interested in a better solution - maybe more elegant.
I've switched from using Pool
to a collection of Process
:
from multiprocessing import Queue, Process
import numpy as np
def process_image(list_images, queue_in, queue_out):
for arg in iter(queue_in.get, "STOP"):
processed_images = []
for img in list_images:
result = arg * img
processed_images.append(result)
queue_out.put(processed_images)
if __name__ == "__main__":
list_img = [np.ones((100, 100))] * 20000 # for demo only
splits = np.split(list_img, 4) # split into 4 chunks
my_pool = []
queue_in = Queue()
queue_out = Queue()
# starting a bunch of process, each owning a part of the list of images
# so list is only copied once
for n in range(4):
proc = Process(target=process_image, args=(splits[n], queue_in, queue_out))
proc.start()
my_pool.append(proc)
params = list(range(100)) # for demo only
for par in params:
for n in my_pool:
queue_in.put(par) # each process gets the same element and starts crunching
return_vals = []
for n in my_pool:
return_vals.append(queue_out.get(block=True)) # wait for results
for element in my_pool:
creature_tasks.put("STOP") # indicate processes to close
for element in pool:
element.join()
The trick is that I only copy the list of images only once during the creation of the processes. Each worker gets its own sub-list of the total list during initialization which has been split before.
Later I provide the argument that should be used to process the images in a small loop. As the processes are blocking until queue_in
contains elements, I just have to provide the respective argument exactly the same times as I have processes. This way the images are not copied again.
Copying the results back (from the processes to the main-process) can't be avoided.