Search code examples
pythonmultiprocessingpython-multiprocessing

Wait for pool.apply_async inside a loop


I am trying to implement multiprocessing in my python code for the first time. I am stuck, since I cannot make async_apply wait for all its processes to finish. I'd like to process the elements in smaller chunks and save the results while I go through the long list of elements.

As a simpler example:

import multiprocessing as mp

def fun(x, y):
    print("here")
    return(x+y)

buffer = []

for val in range(10):
    buffer.append(val)
    print(f{Added value: {val})
    if len(buffer) == 5:
        #It is my understanding, this is necessary on Windows
        if __name__ == "__main__":
            pool = mp.Pool()
            res = [pool.apply_async(fun, args = (x,x)) for x in buffer]
            res = [r.wait() for r in res]
            print(f'Results: {res}')
            buffer = []
            pool.close()
            pool.join()

I would love this to produce the following output:

Added value: 0
Added value: 1
Added value: 2
Added value: 3
Added value: 4
Here
Here
Here
Here
Here
Results: [0, 2, 4, 6, 8]
Added value: 5
Added value: 6
Added value: 7
Added value: 8
Added value: 9
Here
Here
Here
Here
Here
Results: [10, 12, 14, 16, 18]

But it actually produces this (on my machine, at least):

Added value: 0
Added value: 1
Added value: 2
Added value: 3
Added value: 4
Added value: 0
Added value: 1
Added value: 2
Added value: 3
Added value: 4
Added value: 5
Added value: 6
Added value: 7
Added value: 8
Added value: 9
Added value: 0
Added value: 1
Added value: 2
Added value: 3
Added value: 4
Added value: 5
Added value: 6
Added value: 7
Added value: 8
Added value: 9
Added value: 0
Added value: 1
Added value: 2
Added value: 3
Added value: 4
Added value: 5
Added value: 6
Added value: 7
Added value: 8
Added value: 9
Added value: 0
Added value: 1
Added value: 2
Added value: 3
Added value: 4
Added value: 5
Added value: 6
Added value: 7
Added value: 8
Added value: 9
Here
Here
Here
Here
Here
Results: [None, None, None, None, None]
Added value: 5
Added value: 6
Added value: 7
Added value: 8
Added value: 9
Added value: 0
Added value: 1
Added value: 2
Added value: 3
Added value: 4
Added value: 5
Added value: 6
Added value: 7
Added value: 8
Added value: 9
Added value: 0
Added value: 1
Added value: 2
Added value: 3
Added value: 4
Added value: 5
Added value: 6
Added value: 7
Added value: 8
Added value: 9
Added value: 0
Added value: 1
Added value: 2
Added value: 3
Added value: 4
Added value: 5
Added value: 6
Added value: 7
Added value: 8
Added value: 9
Added value: 0
Added value: 1
Added value: 2
Added value: 3
Added value: 4
Added value: 5
Added value: 6
Added value: 7
Added value: 8
Added value: 9
Here
Here
Here
Here
Here
Results: [None, None, None, None, None]

Any suggestion is really appreciated.


Solution

  • Try putting the whole for loop in the conditional suite.

    ...
    if __name__ == '__main__':
    
        for val in range(10):
            buffer.append(val)
            print(f'Added value: {val}')
            if len(buffer) == 5:
                pool = mp.Pool()
                res = [pool.apply_async(fun, args = (x,x)) for x in buffer]
                # wait til they are ALL done ?
                for r in res:
                    r.wait()
                # get the return values
                res = [r.get() for r in res]
                print(f'Results: {res}')
                buffer = []
                pool.close()
                pool.join()
    

    Here is your original with some extra inspection. I still don't know why but it appears that somehow the lines in the for loop are running in multiple python processes.

    import multiprocessing as mp
    import pickle
    
    def fun(x, y, pid=None):
        print(f"here pid:{pid}",file=sys.stderr)
        return (x+y,pid)
    
    buffer = []
    stuff = []
    
    with open(r'c:\pyProjects\stuff.pkl','wb') as f:
        pickle.dump(stuff,f)
    
    for val in range(10):
        buffer.append(val)
        pid = os.getpid()
        print(f'Added value: {val}.   pid={pid}')
        d = {'val':val,'pid':pid}
        with open(r'c:\pyProjects\stuff.pkl','rb') as f:
            try:
                stuff = pickle.load(f)
                stuff.append(d)
            except EOFError as e:
                s = '\n'.join(f'\t\t\t\t{item}' for item in stuff)
                print(f'\t\t\tEOFError {d}\n\t\t\tstuff:\n{s}\n')
        with open(r'c:\pyProjects\stuff.pkl','wb') as f:
            pickle.dump(stuff,f)
        if len(buffer) == 5:
            print(buffer)
            #It is my understanding, this is necessary on Windows
            if __name__ == "__main__":
                pool = mp.Pool()
                res = [pool.apply_async(fun, args = (x,x,pid)) for x in buffer]
                res = [r.get() for r in res]
                print(f'\t\t\tResults: {res}')
                buffer = []
                pool.close()
                pool.join()
    

    After it finishes you can load and peruse the pickled file with

    >>> import pickle
    >>> from pprint import pprint
    >>> with open(r'c:\pyProjects\stuff.pkl','rb') as f:
    ...     a = pickle.load(f)
    
    >>> a.sort(key=lambda x: x['pid'])
    >>> pprint(a)