Search code examples
pythonpython-3.xpython-3.6python-multiprocessingconcurrent.futures

Python concurrent.futures: handling exceptions in child processes


I have a pretty plain vanilla implementation of concurrent.futures.ProcessPoolExecutor -- something like (using Python 3.6):

files = get_files()
processor = get_processor_instance()
with concurrent.futures.ProcessPoolExecutor() as executor:
    list(executor.map(processor.process, files))

While the processor is an instance of any of a number of available processor classes, they all share the process method, which looks roughly like this:

def process(self, file):
    log.debug(f"Processing source file {file.name}.")
    with DBConnection(self.db_url) as session:
        file = session.merge(file)
        session.refresh(file)
        self._set_file(file)
        timer = perf_counter()
        try:
            self.records = self._get_records()
            self._save_output()
        except Exception as ex:
            log.warning(f"Failed to process source file {file.ORIGINAL_NAME}: {ex}")
            self.error_time = time.time()
            self.records = None
        else:
            process_duration = perf_counter() - timer
            log.info(f'File {file.name} processed in {process_duration:.6f} seconds.')
            file.process_duration = process_duration
        session.commit()

Implementation of the _get_records and _save_output methods vary per class, but my problem is with handling of errors. I'm deliberately testing it so that one of those two methods runs out of memory, but I would expect the except block above to catch it and move the the next file -- and this is precisely what happens when I run the code in a single process.

If I use ProcessPoolExecutor as described above, it raises the BrokenProcessPool exception and kills all execution:

Traceback (most recent call last):
  File "/vagrant/myapp/myapp.py", line 94, in _process
    list(executor.map(processor.process, files))
  File "/home/ubuntu/.pyenv/versions/3.6.3/lib/python3.6/concurrent/futures/process.py", line 366, in _chain_from_iterable_of_lists
    for element in iterable:
  File "/home/ubuntu/.pyenv/versions/3.6.3/lib/python3.6/concurrent/futures/_base.py", line 586, in result_iterator
    yield fs.pop().result()
  File "/home/ubuntu/.pyenv/versions/3.6.3/lib/python3.6/concurrent/futures/_base.py", line 432, in result
    return self.__get_result()
  File "/home/ubuntu/.pyenv/versions/3.6.3/lib/python3.6/concurrent/futures/_base.py", line 384, in __get_result
    raise self._exception
concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.

I can of course catch the BrokenProcessPool in the calling code, but I'd prefer to handle the error internally and proceed to the next file.

I also tried using the standard multiprocessing.Pool object, like this:

with multiprocessing.Pool() as pool:
    pool.map(processor.process, files)

In this case, the behaviour is even weirder: after starting to process the first two files, which raise the out of memory error, it moves on to processing the later files, which are smaller so get processed completely. However, the except block apparently never gets triggered (no log messages, no error_time), and the application just hangs, neither finishing nor doing anything, until killed manually.

I was hoping that the try..except block would make each process self-contained, handling its own errors without affecting the main application. Any ideas how to achieve that?


Solution

  • So, after a lot of debugging (and with a due credit to @RomanPerekhrest's suggestion to check the executor object), I've figured the cause. As described in the question, the test data consisted of a number of files, two of which were quite large (over 1 million lines of CSV, each). Those two each caused my test machine (a 2GB VM) to choke, but in different ways -- while the first, which was larger, caused a regular out of memory error which would be handled by the except, the second simply caused a sigkill. Without exploring too much, I suspect that the larger file simply couldn't fit in the memory on read (done in the _get_records method), while the smaller one could, but then manipulation on it (done in _save_output) caused the overflow and killed the process.

    My solution was to simply catch the BrokenProcessPool exception and inform the user of the issue; I also added an option which runs the processing tasks in one process, in which case any too-large files are simply marked as having an error:

    files = get_files()
    processor = get_processor_instance()
    results = []
    if args.nonconcurrent:
        results = list(map(processor.process, files))
    else:
        with concurrent.futures.ProcessPoolExecutor() as executor:
            try:
                results = list(executor.map(processor.process, files))
            except concurrent.futures.process.BrokenProcessPool as ex:
                raise MyCustomProcessingError(
                    f"{ex} This might be caused by limited system resources. "
                    "Try increasing system memory or disable concurrent processing "
                    "using the --nonconcurrent option."
                )