Search code examples
pythonpython-3.xmultiprocessingpython-multiprocessing

unexpected behaviour of multiprocessing Pool map_async


I have some code that does the same thing to several files in a python 3 application and so seems like a great candidate for multiprocessing. I'm trying to use Pool to assign work to some number of processes. I'd like the code to continue do other things (mainly displaying things for the user) while these calculations are going on, so i'd like to use the map_async function of the multiprocessing.Pool class for this. I would expect that after calling this, the code will continue and the result will be handled by the callback I've specified, but this doesn't seem to be happening. The following code shows three ways I've tried calling map_async and the results I've seen:

import multiprocessing
NUM_PROCS = 4
def func(arg_list):
    arg1 = arg_list[0]
    arg2 = arg_list[1]
    print('start func')
    print ('arg1 = {0}'.format(arg1))
    print ('arg2 = {0}'.format(arg2))
    time.sleep(1)
    result1 = arg1 * arg2
    print('end func')
    return result1

def callback(result):
    print('result is {0}'.format(result))


def error_handler(error1):
    print('error in call\n {0}'.format(error1))


def async1(arg_list1):
    # This is how my understanding of map_async suggests i should
    # call it. When I execute this, the target function func() is not called
    with multiprocessing.Pool(NUM_PROCS) as p1:
        r1 = p1.map_async(func,
                          arg_list1,
                          callback=callback,
                          error_callback=error_handler)


def async2(arg_list1):
    with multiprocessing.Pool(NUM_PROCS) as p1:
        # If I call the wait function on the result for a small
        # amount of time, then the target function func() is called
        # and executes sucessfully in 2 processes, but the callback
        # function is never called so the results are not processed
        r1 = p1.map_async(func,
                          arg_list1,
                          callback=callback,
                          error_callback=error_handler)
        r1.wait(0.1)


def async3(arg_list1):
    # if I explicitly call join on the pool, then the target function func()
    # successfully executes in 2 processes and the callback function is also
    # called, but by calling join the processing is not asynchronous any more
    # as join blocks the main process until the other processes are finished.
    with multiprocessing.Pool(NUM_PROCS) as p1:
        r1 = p1.map_async(func,
                          arg_list1,
                          callback=callback,
                          error_callback=error_handler)
        p1.close()
        p1.join()


def main():
    arg_list1 = [(5, 3), (7, 4), (-8, 10), (4, 12)]
    async3(arg_list1)

    print('pool executed successfully')


if __name__ == '__main__':
    main()

When async1, async2 or async3 is called in main, the results are described in the comments for each function. Could any one explain why the different calls are behaving the way they are? Ultimately I'd like to call map_async as done in async1, so i can do something in else the main process while the worker processes are busy. I have tested this code with python 2.7 and 3.6, on an older RH6 linux box and a newer ubuntu VM, with the same results.


Solution

  • This is happening because when you use the multiprocessing.Pool as a context manager, pool.terminate() is called when you leave the with block, which immediately exits all workers, without waiting for in-progress tasks to finish.

    New in version 3.3: Pool objects now support the context management protocol – see Context Manager Types. __enter__() returns the pool object, and __exit__() calls terminate().

    IMO using terminate() as the __exit__ method of the context manager wasn't a great design choice, since it seems most people intuitively expect close() will be called, which will wait for in-progress tasks to complete before exiting. Unfortunately all you can do is refactor your code away from using a context manager, or refactor your code so that you guarantee you don't leave the with block until the Pool is done doing its work.