Search code examples
pythonpython-multiprocessing

Parallel processing of tasks in python using multiprocessing.Pool


Here is my code for executing tasks in parallel. The program executes all the shell commands to resize all the icons using the magick program.

import glob
from subprocess import run
import subprocess
from multiprocessing import Process, Pool, cpu_count
import time
import os

class ParallelProcess:
    def __init__(self, files):
        """Constructor
        
        Args:
            files (List[str]): A list of icon files. 
        """
        self.files = files

    def parallel_resize(self, file):
        """Task function for multiprocessing Pool.
        
        Args:
            file (TYPE): Icon file path
        """
        
        print("Resizing : {}".format(file))
        cmnd = "magick {} -resize 100x100^ -gravity center -extent 100x100 ./out/{}".format(file, os.path.basename(file))
        try:
            data = run(cmnd, capture_output=True, shell=True)
            errors = data.stderr.splitlines()
            if errors:
                for e in errors:
                    print(e.decode())
                exit()
        except subprocess.CalledProcessError as e:
            print(e.stderr)
            exit()

    def execute_parallel(self):
        '''Execute tasks in parallel.
        '''
        start_time = time.perf_counter()
          
        try:
            pool = Pool()  
            pool.map(self.parallel_resize, self.files, 4)
        except Exception as e:
            print(e.stderr)

        end_time = time.perf_counter()
        print("Program finished in {} seconds.".format(end_time-start_time))

if __name__ == '__main__':
    t = ParallelProcess(glob.glob("./rabbits/*.ico"))
    t.execute_parallel()

The results are:

Serial processing finished in 5.773 seconds.
Parallel processing finished in 2.01 seconds. chunk size = 1
Parallel processing finished in 2.181 seconds. chunk size = 4
Parallel processing finished in 2.2651441 seconds. chunk size = 8

On my Intel i9, 8 processors (4 logical) laptop, the results of using different chunk sizes are shown. I am not able to figure out how to choose the correct chunk size. I will be using the above code to process thousands of images.

How do I handle interrupts? I have purposely copied a corrupt icon file in the directory. When magick processes that file, errors are produced. In that case, the processing should immediately stop, but I'm not able to figure it out correctly, although I'm still trying.

Practically, I can think of two ways to stop:

  1. As soon as an error is produced, finish the files in the queue and stop.
  2. Immediately stop, even drop the tasks in the queue.

thus How do you handle an error interruption correctly?

Prashant


Solution

  • TLDR; Use imap_unordered, set chunksize=1 and move the worker out of the class definition.

    TLRDR; Use threads instead.

    There is a cost to passing data between the parent process and its workers. If the operation performed is short, this cost can dominate, making multiprocessing slower than doing the work serially. By default, the pool will send / receive in chunks to minimize this overhead.

    But there are downsides. If some data processes faster than others, you have to wait for the chunk that just happened to have the most expensive calculations to complete, even if all of the other workers are done and could have been used for the expensive stuff. And other problems like a worker raising an exception are not seen until chunk processing completes.

    Pool.map and Pool.imap both maintain the order of operations, so they have to accumulate results, and this also decreases responsiveness when errors are raised. Since you don't care about maintaining order, use imap_unordered instead.

    Be careful about serializing more stuff than you need to. Your worker self.parallel_resize means that the whole ParallelProcess object needs to be serialized for the workers. But that is fragile and unnecessary. Make your worker a function instead.

    Finally, why go MP? All you are doing is executing a program which is a separate process anyway. Just use a thread pool and save the cost of using processes.

    import glob
    from subprocess import run
    import subprocess
    from multiprocessing.pool import ThreadPool
    import time
    import os
    
    def parallel_resize(file):
        """Task function for Pool.
        
        Args:
            file (TYPE): Icon file path
        """
        
        print("Resizing : {}".format(file))
        cmnd = "magick {} -resize 100x100^ -gravity center -extent 100x100 ./out/{}".format(file, os.path.basename(file))
        # let errors propegate to caller
        data = run(cmnd, capture_output=True, shell=True, check=True)
        if data.stderr:
            # treat any stderr traffic as error
            raise subprocess.CalledProcessError(data.returncode, data.args, 
                data.stdout, data.stderr)
    
    
    class ParallelProcess:
        def __init__(self, files):
            """Constructor
            
            Args:
                files (List[str]): A list of icon files. 
            """
            self.files = files
    
        def execute_parallel(self):
            '''Execute tasks in parallel.
            '''
            start_time = time.perf_counter()
            
            try:
                with ThreadPool() as pool: 
                    list(pool.imap_unordered(parallel_resize, self.files, chunksize=1))
            except subprocess.CalledProcessError as e:
                print(e.stderr.decode())
            except Exception as e:
                print(e)
    
            end_time = time.perf_counter()
            print("Program finished in {} seconds.".format(end_time-start_time))
    
    if __name__ == '__main__':
        t = ParallelProcess(glob.glob("./rabbits/*.ico"))
        t.execute_parallel()
        
    

    This code should very beat your chunksize=1 test case and should be responsive to errors to the extent that other threads will finish their current call to magick but not progress further.