In a custom class I have the following code:
class CustomClass():
triggerQueue: multiprocessing.Queue
def __init__(self):
self.triggerQueue = multiprocessing.Queue()
def poolFunc(queueString):
print(queueString)
def listenerFunc(self):
pool = multiprocessing.Pool(5)
while True:
try:
queueString = self.triggerQueue.get_nowait()
pool.apply_async(func=self.poolFunc, args=(queueString,))
except queue.Empty:
break
What I intend to do is:
It works as soon as I source my poolFun() outside of the class like
def poolFunc(queueString):
print(queueString)
class CustomClass():
[...]
But why is that so? Do I have to pass the self argument somehow? Is it impossible to perform it this way in general?
Thank you for any hint!
There are several problems going on here.
Your instance method, poolFunc
, is missing a self
parameter.
You are never properly terminating the Pool. You should take advantage of the fact that a multiprocessing.Pool object is a context manager.
You're calling apply_async
, but you're never waiting for the results. Read the documentation: you need to call the get
method on the AsyncResult
object to receive the result; if you don't do this before your program exits your poolFunc
function may never run.
By making the Queue object part of your class, you won't be able to pass instance methods to workers.
We can fix all of the above like this:
import multiprocessing
import queue
triggerQueue = multiprocessing.Queue()
class CustomClass:
def poolFunc(self, queueString):
print(queueString)
def listenerFunc(self):
results = []
with multiprocessing.Pool(5) as pool:
while True:
try:
queueString = triggerQueue.get_nowait()
results.append(pool.apply_async(self.poolFunc, (queueString,)))
except queue.Empty:
break
for res in results:
print(res.get())
c = CustomClass()
for i in range(10):
triggerQueue.put(f"testval{i}")
c.listenerFunc()
You can, as you mention, also replace your instance method with a static method, in which case we can keep triggerQueue as part of the class:
import multiprocessing
import queue
class CustomClass:
def __init__(self):
self.triggerQueue = multiprocessing.Queue()
@staticmethod
def poolFunc(queueString):
print(queueString)
def listenerFunc(self):
results = []
with multiprocessing.Pool(5) as pool:
while True:
try:
queueString = self.triggerQueue.get_nowait()
results.append(pool.apply_async(self.poolFunc, (queueString,)))
except queue.Empty:
break
for r in results:
print(r.get())
c = CustomClass()
for i in range(10):
c.triggerQueue.put(f"testval{i}")
c.listenerFunc()
But we still need to reap the pool_async
results.