Search code examples
pythonqueuemultiprocessingpool

How do you pass a Queue reference to a function managed by pool.map_async()?


I want a long-running process to return its progress over a Queue (or something similar) which I will feed to a progress bar dialog. I also need the result when the process is completed. A test example here fails with a RuntimeError: Queue objects should only be shared between processes through inheritance.

import multiprocessing, time

def task(args):
    count = args[0]
    queue = args[1]
    for i in xrange(count):
        queue.put("%d mississippi" % i)
    return "Done"

def main():
    q = multiprocessing.Queue()
    pool = multiprocessing.Pool()
    result = pool.map_async(task, [(x, q) for x in range(10)])
    time.sleep(1)
    while not q.empty():
        print q.get()
    print result.get()

if __name__ == "__main__":
    main()

I've been able to get this to work using individual Process objects (where I am alowed to pass a Queue reference) but then I don't have a pool to manage the many processes I want to launch. Any advise on a better pattern for this?


Solution

  • The following code seems to work:

    import multiprocessing, time
    
    def task(args):
        count = args[0]
        queue = args[1]
        for i in xrange(count):
            queue.put("%d mississippi" % i)
        return "Done"
    
    
    def main():
        manager = multiprocessing.Manager()
        q = manager.Queue()
        pool = multiprocessing.Pool()
        result = pool.map_async(task, [(x, q) for x in range(10)])
        time.sleep(1)
        while not q.empty():
            print q.get()
        print result.get()
    
    if __name__ == "__main__":
        main()
    

    Note that the Queue is got from a manager.Queue() rather than multiprocessing.Queue(). Thanks Alex for pointing me in this direction.