Search code examples
pythonpython-3.xmultiprocessingpython-multiprocessing

how to add more items to a multiprocessing queue while script in motion


I am trying to learn multiprocessing with queue.

What I want to do is figure out when/how to "add more items to the queue" when the script is in motion.

The below script is the baseline I am working from:

import multiprocessing


class MyFancyClass:

    def __init__(self, name):
        self.name = name

    def do_something(self):
        proc_name = multiprocessing.current_process().name
        print('Doing something fancy in {} for {}!'.format(
            proc_name, self.name))


def worker(q):
    obj = q.get()
    obj.do_something()


if __name__ == '__main__':
    queue = multiprocessing.Queue()

    p = multiprocessing.Process(target=worker, args=(queue,))
    p.start()

    queue.put(MyFancyClass('Fancy Dan'))
    queue.put(MyFancyClass('Frankie'))
    print(queue.qsize())

    # Wait for the worker to finish
    queue.close()
    queue.join_thread()
    p.join()

on line 26, the Fancy Dan inject works, but the Frankie piece doesn't. I am able to confirm that Frankie does make it into the queue. I need a spot where I can "Check for more items" and insert them into the queue as needed. If no more items exist, then close the queue when the existing items are clear.

How do I do this?

Thanks!


Solution

  • Let's make it clear:

    • the target function worker(q) will be called just once in the above scheme. At that first call the function will suspend waiting the result from blocking operation q.get(). It gets the instance MyFancyClass('Fancy Dan') from the queue, invokes its do_something method and get finished.
    • MyFancyClass('Frankie') will be put into the queue but won't go to the Process cause the process' target function is done.
    • one of the ways is to read from the queue and wait for a signal/marked item which signals that queue usage is stopped. Let's say None value.

    import multiprocessing
    
    
    class MyFancyClass:
    
        def __init__(self, name):
            self.name = name
    
        def do_something(self):
            proc_name = multiprocessing.current_process().name
            print('Doing something fancy in {} for {}!'.format(proc_name, self.name))
    
    
    def worker(q):
        while True:
            obj = q.get()
            if obj is None:
                break
            obj.do_something()
    
    
    if __name__ == '__main__':
        queue = multiprocessing.Queue()
    
        p = multiprocessing.Process(target=worker, args=(queue,))
        p.start()
    
        queue.put(MyFancyClass('Fancy Dan'))
        queue.put(MyFancyClass('Frankie'))
        # print(queue.qsize())
        queue.put(None)
    
        # Wait for the worker to finish
        queue.close()
        queue.join_thread()
        p.join()
    

    The output:

    Doing something fancy in Process-1 for Fancy Dan!
    Doing something fancy in Process-1 for Frankie!