Search code examples
pythondictionarymultiprocessingpoolpython-itertools

python no output when using pool.map_async


I am experiencing very strange issues while working with the data inside my function that gets called by pool.map. For example, the following code works as expected...

import csv
import multiprocessing
import itertools
from collections import deque

cur_best = 0
d_sol = deque(maxlen=9)
d_names = deque(maxlen=9)

**import CSV Data1**

def calculate(vals):
    #global cur_best
    sol = sum(int(x[2]) for x in vals)
    names = [x[0] for x in vals]
    print(", ".join(names) + " = " + str(sol))

def process():
    pool = multiprocessing.Pool(processes=4)
    prod = itertools.product(([x[2], x[4], x[10]] for x in Data1))
    result = pool.map_async(calculate, prod)
    pool.close()
    pool.join()
    return result

process()

Now when I add a simple if-statement to calculate(), I get no output.

   def calculate(vals):
        #global cur_best
        sol = sum(int(x[2]) for x in vals)
        if sol > cur_best:
             cur_best = sol
             names = [x[0] for x in vals]
             print(", ".join(names) + " = " + str(cur_best))
             #would like to append cur_best and names to a deque

I have tried adjusting where I declare 'cur_best' to no avail.

I am trying to keep track of the 'current best' solution as I am running through the calculations. In my linear code, this logic resides in a nested for-loop and I append each new 'cur_best' to a deque.

Do my new issues relate to the way pool.map or pool.map_async work? Can I no longer treat my calculate() function as a linear loop?

There are several other conditional statements I need to address. Should I be handling this in a different part of the code? And if so, how exactly?


Solution

  • There are likely two things going on here. First, the reason you're not seeing anything printed from the worker function is probably because it's throwing an exception. Because you're using map_async, you won't actually see the exception until you call result.get(). However, since you're callnig close/join on the pool right after using map_async, you should probably just use map instead, which will block until all the work is complete (or an exception is thrown). I'm not sure why the exception is happening (nothing jumps out from the code you provided), but my guess would be you're pulling the wrong index from your list somewhere.

    Second, as Armin Rigo pointed out, cur_best is not shared between all processes, so your logic won't work the way you're intending. I think the easiest option is to use a multiprocessing.Value to create an integer in shared memory, which will be accessible to all processes.

    To append the results you're getting to a deque, you would need to create shared deques, using a multiprocessing.Manager. A Manager spawns a server process that can manage shared access to an object (like a deque). Each process in you pool (as well as the parent process) gets access to a Proxy object, which can communicate with the Manager's process to read/write to the shared object.

    Here's an example showing everything discussed above:

    import itertools
    import multiprocessing
    from collections import deque
    from multiprocessing.managers import BaseManager, MakeProxyType
    
    class DequeManager(BaseManager):
       pass
    
    BaseDequeProxy = MakeProxyType('BaseDequeProxy', (
        '__add__', '__contains__', '__delitem__', '__getitem__', '__len__',
        '__mul__', '__reversed__', '__rmul__', '__setitem__',
        'append', 'count', 'extend', 'extendleft', 'index', 'insert', 'pop', 
        'remove', 'reverse', 'sort', 'appendleft', 'popleft', 'rotate', 
        '__imul__'
        ))
    class DequeProxy(BaseDequeProxy):
        def __iadd__(self, value):
            self._callmethod('extend', (value,))
            return self
        def __imul__(self, value):
            self._callmethod('__imul__', (value,))
            return self
    
    DequeManager.register('deque', deque, DequeProxy)
    
    
    cur_best = d_sol = d_names = None
    
    def init_globals(best, sol, names):
        """ This will be called in each worker process. 
    
        A global variable (cur_best) will be created in each worker.
        Because it is a multiprocessing.Value, it will be shared
        between each worker, too.
    
        """
        global cur_best, d_sol, d_names
        cur_best = best
        d_sol = sol
        d_names = names
    
    def calculate(vals):
        global cur_best
        sol = sum(int(x[2]) for x in vals)
        if sol > cur_best.value:
            cur_best.value = sol
            names = [x[0] for x in vals]
            print(", ".join(names) + " = " + str(cur_best.value))
            d_sol.append(cur_best.value)
            d_names.append(names)
        return sol
    
    def process():
        global d_sol, d_names
        cur_best = multiprocessing.Value("I", 0)  # unsigned int
    
        m = DequeManager()
        m.start()
        d_sol = m.deque(maxlen=9)
        d_names = m.deque(maxlen=9)  
    
        pool = multiprocessing.Pool(processes=4, initializer=init_globals, 
                                    initargs=(cur_best, d_sol, d_names))
        prod = itertools.product([x[2], x[4], x[10]] for x in Data1)
        result = pool.map(calculate, prod)  # map instead of map_async
        pool.close()
        pool.join()
        return result  # Result will be a list containing the value of `sol` returned from each worker call
    
    if __name__ == "__main__":    
        print(process())