I have a pretty plain vanilla implementation of concurrent.futures.ProcessPoolExecutor
-- something like (using Python 3.6):
files = get_files()
processor = get_processor_instance()
with concurrent.futures.ProcessPoolExecutor() as executor:
list(executor.map(processor.process, files))
While the processor
is an instance of any of a number of available processor classes, they all share the process
method, which looks roughly like this:
def process(self, file):
log.debug(f"Processing source file {file.name}.")
with DBConnection(self.db_url) as session:
file = session.merge(file)
session.refresh(file)
self._set_file(file)
timer = perf_counter()
try:
self.records = self._get_records()
self._save_output()
except Exception as ex:
log.warning(f"Failed to process source file {file.ORIGINAL_NAME}: {ex}")
self.error_time = time.time()
self.records = None
else:
process_duration = perf_counter() - timer
log.info(f'File {file.name} processed in {process_duration:.6f} seconds.')
file.process_duration = process_duration
session.commit()
Implementation of the _get_records
and _save_output
methods vary per class, but my problem is with handling of errors. I'm deliberately testing it so that one of those two methods runs out of memory, but I would expect the except
block above to catch it and move the the next file -- and this is precisely what happens when I run the code in a single process.
If I use ProcessPoolExecutor
as described above, it raises the BrokenProcessPool
exception and kills all execution:
Traceback (most recent call last):
File "/vagrant/myapp/myapp.py", line 94, in _process
list(executor.map(processor.process, files))
File "/home/ubuntu/.pyenv/versions/3.6.3/lib/python3.6/concurrent/futures/process.py", line 366, in _chain_from_iterable_of_lists
for element in iterable:
File "/home/ubuntu/.pyenv/versions/3.6.3/lib/python3.6/concurrent/futures/_base.py", line 586, in result_iterator
yield fs.pop().result()
File "/home/ubuntu/.pyenv/versions/3.6.3/lib/python3.6/concurrent/futures/_base.py", line 432, in result
return self.__get_result()
File "/home/ubuntu/.pyenv/versions/3.6.3/lib/python3.6/concurrent/futures/_base.py", line 384, in __get_result
raise self._exception
concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.
I can of course catch the BrokenProcessPool
in the calling code, but I'd prefer to handle the error internally and proceed to the next file.
I also tried using the standard multiprocessing.Pool
object, like this:
with multiprocessing.Pool() as pool:
pool.map(processor.process, files)
In this case, the behaviour is even weirder: after starting to process the first two files, which raise the out of memory error, it moves on to processing the later files, which are smaller so get processed completely. However, the except
block apparently never gets triggered (no log messages, no error_time
), and the application just hangs, neither finishing nor doing anything, until killed manually.
I was hoping that the try..except
block would make each process self-contained, handling its own errors without affecting the main application. Any ideas how to achieve that?
So, after a lot of debugging (and with a due credit to @RomanPerekhrest's suggestion to check the executor
object), I've figured the cause. As described in the question, the test data consisted of a number of files, two of which were quite large (over 1 million lines of CSV, each). Those two each caused my test machine (a 2GB VM) to choke, but in different ways -- while the first, which was larger, caused a regular out of memory error which would be handled by the except
, the second simply caused a sigkill
. Without exploring too much, I suspect that the larger file simply couldn't fit in the memory on read (done in the _get_records
method), while the smaller one could, but then manipulation on it (done in _save_output
) caused the overflow and killed the process.
My solution was to simply catch the BrokenProcessPool
exception and inform the user of the issue; I also added an option which runs the processing tasks in one process, in which case any too-large files are simply marked as having an error:
files = get_files()
processor = get_processor_instance()
results = []
if args.nonconcurrent:
results = list(map(processor.process, files))
else:
with concurrent.futures.ProcessPoolExecutor() as executor:
try:
results = list(executor.map(processor.process, files))
except concurrent.futures.process.BrokenProcessPool as ex:
raise MyCustomProcessingError(
f"{ex} This might be caused by limited system resources. "
"Try increasing system memory or disable concurrent processing "
"using the --nonconcurrent option."
)