Search code examples
pythonerror-handlingmultiprocessingmetpy

How to include exception handling within a Python pool.starmap multiprocess


I'm using the metpy library to do weather calculations. I'm using the multiprocessing library to run them in parallel, but I get rare exceptions, which completely stop the program. I am not able to provide a minimal, reproducible example because I can't replicate the problems with the metpy library functions and because there is a huge amount of code that runs before the problem occurs that I can't put here.

I want to know how to write multiprocessing code to tell the pool.starmap function to PASS if it encounters an error. The first step in my code produces an argument list, which then gets passed to the pool.starmap function, along with the metpy function (metpy.ccl, in this case). The argument list for metpy.ccl includes a list of pressure levels, air temperatures, and dew point values.

ccl_pooled = pool.starmap(mpcalc.ccl, ccl_argument_list)

I tried to write a generalized function that would take the metpy function I pass to it and tell it to pass when it encounters an error.

    def run_ccl(p,t,td):
        try:
            result = mpcalc.ccl(p,t,td)
        except IndexError:
            pass

Is there a way for me to write the "run_ccl" function so I can check for errors in my original code line - something like this:

ccl_pooled = pool.starmap(run_ccl, ccl_argument_list)

If not, what would be the best way to do this? EDIT: To clarify, these argument lists are thousands of data points long. I want to pass on the data point that causes the problem (and enter a nan in the result, "ccl_pooled", for that data point), and keep going.


Solution

  • You can generalize run_ccl with a wrapper function that suppresses specified exceptions and returns NaN as a default value:

    from contextlib import suppress
    
    def suppressor(func, *exceptions):
        def wrapper(*args, **kwargs):
            with suppress(*exceptions):
                return func(*args, **kwargs)
            return float('nan')
        return wrapper
    

    with which you can then rewrite the code into something like:

    ccl_pooled = pool.starmap(suppressor(mpcalc.ccl, IndexError), ccl_argument_list)