Search code examples
pythonmultiprocessingpython-multiprocessingpython-class

Python multiprocessing.Pool.apply_async() not executing class function


In a custom class I have the following code:

class CustomClass():
    
    triggerQueue: multiprocessing.Queue

    def __init__(self):
        
        self.triggerQueue = multiprocessing.Queue()

    def poolFunc(queueString):
        
        print(queueString)

    def listenerFunc(self):

        pool = multiprocessing.Pool(5)

        while True:
            try:
                queueString = self.triggerQueue.get_nowait()
                pool.apply_async(func=self.poolFunc, args=(queueString,))

            except queue.Empty:
                break

What I intend to do is:

  • add a trigger to the queue (not implemented in this snippet) -> works as intended
  • run an endless loop within the listenerFunc that reads all triggers from the queue (if any are found) -> works as intended
  • pass trigger to poolFunc which is to be executed asynchronosly -> not working

It works as soon as I source my poolFun() outside of the class like

def poolFunc(queueString):
        
    print(queueString)

class CustomClass():
    [...]

But why is that so? Do I have to pass the self argument somehow? Is it impossible to perform it this way in general?

Thank you for any hint!


Solution

  • There are several problems going on here.

    1. Your instance method, poolFunc, is missing a self parameter.

    2. You are never properly terminating the Pool. You should take advantage of the fact that a multiprocessing.Pool object is a context manager.

    3. You're calling apply_async, but you're never waiting for the results. Read the documentation: you need to call the get method on the AsyncResult object to receive the result; if you don't do this before your program exits your poolFunc function may never run.

    4. By making the Queue object part of your class, you won't be able to pass instance methods to workers.

    We can fix all of the above like this:

    import multiprocessing
    import queue
    
    triggerQueue = multiprocessing.Queue()
    
    
    class CustomClass:
        def poolFunc(self, queueString):
            print(queueString)
    
        def listenerFunc(self):
    
            results = []
    
            with multiprocessing.Pool(5) as pool:
                while True:
                    try:
                        queueString = triggerQueue.get_nowait()
                        results.append(pool.apply_async(self.poolFunc, (queueString,)))
                    except queue.Empty:
                        break
    
                for res in results:
                    print(res.get())
    
    
    c = CustomClass()
    
    for i in range(10):
        triggerQueue.put(f"testval{i}")
    
    c.listenerFunc()
    

    You can, as you mention, also replace your instance method with a static method, in which case we can keep triggerQueue as part of the class:

    import multiprocessing
    import queue
    
    
    class CustomClass:
        def __init__(self):
            self.triggerQueue = multiprocessing.Queue()
    
        @staticmethod
        def poolFunc(queueString):
            print(queueString)
    
        def listenerFunc(self):
    
            results = []
    
            with multiprocessing.Pool(5) as pool:
                while True:
                    try:
                        queueString = self.triggerQueue.get_nowait()
                        results.append(pool.apply_async(self.poolFunc, (queueString,)))
                    except queue.Empty:
                        break
    
                for r in results:
                    print(r.get())
    
    
    c = CustomClass()
    
    for i in range(10):
        c.triggerQueue.put(f"testval{i}")
    
    c.listenerFunc()
    

    But we still need to reap the pool_async results.