Search code examples
pythonmultiprocessingpoolstarmap

Repeatedly process big list of images with changing parameters using multiple cores in python


I have a big list of images list_img, say 20k that I need to process multiple times with changing arguments out of a list params = [arg1, arg2, ...]. Ideally, I want to use multiple processes to do so. But I need all processes to first use arg1 and then arg2 on chunks of my list list_img. The processing time for each arg in params varies greatly. So if I would distribute the list params over my processes instead of the list of images (core 1: arg1, core 2: arg2, ...) it happens that after a while most of the processes are idle (finished) while very few are still crunching data.

My current (working) solution looks like that:

from multiprocessing import Pool
import numpy as np

def calc_image(argument, image):
    val = argument * image    # not the real process, just demo
    return val

if __name__ == "__main__":
    pool = Pool(processes=8)
    list_img = [np.ones((100, 100))] * 20000    # for demo only
    params = list(range(100))    # for demo only
    for par in params:
        par_list = [par] * len(list_img)
        return_vals = pool.starmap(calc_image, zip(par_list, list_img))
    pool.close()

How can I avoid to copy the list list_img every time the variable par changes in the for-loop? I also would like to avoid using global variables, if possible.


Solution

  • This is my current workaround for the problem. I'm still interested in a better solution - maybe more elegant.

    I've switched from using Poolto a collection of Process:

    from multiprocessing import Queue, Process
    import numpy as np
    
    def process_image(list_images, queue_in, queue_out):
        for arg in iter(queue_in.get, "STOP"):
            processed_images = []
            for img in list_images:
                result = arg * img
                processed_images.append(result)
            queue_out.put(processed_images)
    
    if __name__ == "__main__":
        list_img = [np.ones((100, 100))] * 20000    # for demo only
        splits = np.split(list_img, 4)   # split into 4 chunks
        my_pool = []
        queue_in = Queue()
        queue_out = Queue()
        # starting a bunch of process, each owning a part of the list of images
        # so list is only copied once
        for n in range(4):
            proc = Process(target=process_image, args=(splits[n], queue_in, queue_out))
            proc.start()
            my_pool.append(proc)
        params = list(range(100))    # for demo only
        for par in params:
            for n in my_pool:
                queue_in.put(par)    # each process gets the same element and starts crunching
            return_vals = []
            for n in my_pool:
                return_vals.append(queue_out.get(block=True)) # wait for results
        for element in my_pool:
            creature_tasks.put("STOP")   # indicate processes to close
        for element in pool:
            element.join()
    

    The trick is that I only copy the list of images only once during the creation of the processes. Each worker gets its own sub-list of the total list during initialization which has been split before. Later I provide the argument that should be used to process the images in a small loop. As the processes are blocking until queue_in contains elements, I just have to provide the respective argument exactly the same times as I have processes. This way the images are not copied again.

    Copying the results back (from the processes to the main-process) can't be avoided.