Here is my code for executing tasks in parallel. The program executes all the shell commands to resize all the icons using the magick program.
import glob
from subprocess import run
import subprocess
from multiprocessing import Process, Pool, cpu_count
import time
import os
class ParallelProcess:
def __init__(self, files):
"""Constructor
Args:
files (List[str]): A list of icon files.
"""
self.files = files
def parallel_resize(self, file):
"""Task function for multiprocessing Pool.
Args:
file (TYPE): Icon file path
"""
print("Resizing : {}".format(file))
cmnd = "magick {} -resize 100x100^ -gravity center -extent 100x100 ./out/{}".format(file, os.path.basename(file))
try:
data = run(cmnd, capture_output=True, shell=True)
errors = data.stderr.splitlines()
if errors:
for e in errors:
print(e.decode())
exit()
except subprocess.CalledProcessError as e:
print(e.stderr)
exit()
def execute_parallel(self):
'''Execute tasks in parallel.
'''
start_time = time.perf_counter()
try:
pool = Pool()
pool.map(self.parallel_resize, self.files, 4)
except Exception as e:
print(e.stderr)
end_time = time.perf_counter()
print("Program finished in {} seconds.".format(end_time-start_time))
if __name__ == '__main__':
t = ParallelProcess(glob.glob("./rabbits/*.ico"))
t.execute_parallel()
The results are:
Serial processing finished in 5.773 seconds.
Parallel processing finished in 2.01 seconds. chunk size = 1
Parallel processing finished in 2.181 seconds. chunk size = 4
Parallel processing finished in 2.2651441 seconds. chunk size = 8
On my Intel i9, 8 processors (4 logical) laptop, the results of using different chunk sizes are shown. I am not able to figure out how to choose the correct chunk size. I will be using the above code to process thousands of images.
How do I handle interrupts? I have purposely copied a corrupt icon file in the directory. When magick processes that file, errors are produced. In that case, the processing should immediately stop, but I'm not able to figure it out correctly, although I'm still trying.
Practically, I can think of two ways to stop:
thus How do you handle an error interruption correctly?
Prashant
TLDR; Use imap_unordered
, set chunksize=1
and move the worker out of the class definition.
TLRDR; Use threads instead.
There is a cost to passing data between the parent process and its workers. If the operation performed is short, this cost can dominate, making multiprocessing slower than doing the work serially. By default, the pool will send / receive in chunks to minimize this overhead.
But there are downsides. If some data processes faster than others, you have to wait for the chunk that just happened to have the most expensive calculations to complete, even if all of the other workers are done and could have been used for the expensive stuff. And other problems like a worker raising an exception are not seen until chunk processing completes.
Pool.map and Pool.imap both maintain the order of operations, so they have to accumulate results, and this also decreases responsiveness when errors are raised. Since you don't care about maintaining order, use imap_unordered
instead.
Be careful about serializing more stuff than you need to. Your worker self.parallel_resize means that the whole ParallelProcess object needs to be serialized for the workers. But that is fragile and unnecessary. Make your worker a function instead.
Finally, why go MP? All you are doing is executing a program which is a separate process anyway. Just use a thread pool and save the cost of using processes.
import glob
from subprocess import run
import subprocess
from multiprocessing.pool import ThreadPool
import time
import os
def parallel_resize(file):
"""Task function for Pool.
Args:
file (TYPE): Icon file path
"""
print("Resizing : {}".format(file))
cmnd = "magick {} -resize 100x100^ -gravity center -extent 100x100 ./out/{}".format(file, os.path.basename(file))
# let errors propegate to caller
data = run(cmnd, capture_output=True, shell=True, check=True)
if data.stderr:
# treat any stderr traffic as error
raise subprocess.CalledProcessError(data.returncode, data.args,
data.stdout, data.stderr)
class ParallelProcess:
def __init__(self, files):
"""Constructor
Args:
files (List[str]): A list of icon files.
"""
self.files = files
def execute_parallel(self):
'''Execute tasks in parallel.
'''
start_time = time.perf_counter()
try:
with ThreadPool() as pool:
list(pool.imap_unordered(parallel_resize, self.files, chunksize=1))
except subprocess.CalledProcessError as e:
print(e.stderr.decode())
except Exception as e:
print(e)
end_time = time.perf_counter()
print("Program finished in {} seconds.".format(end_time-start_time))
if __name__ == '__main__':
t = ParallelProcess(glob.glob("./rabbits/*.ico"))
t.execute_parallel()
This code should very beat your chunksize=1
test case and should be responsive to errors to the extent that other threads will finish their current call to magick
but not progress further.