Suppose I have a pool with a few processes inside of a class that I use to do some processing, like this:
class MyClass:
def __init_(self):
self.pool = Pool(processes = NUM_PROCESSES)
self.pop = []
self.finished = []
def gen_pop(self):
self.pop = [ self.pool.apply_async(Item.test, (Item(),)) for _ in range(NUM_PROCESSES) ]
while (not self.check()):
continue
# Do some other stuff
def check(self):
self.finished = filter(lambda t: self.pop[t].ready(), range(NUM_PROCESSES))
new_pop = []
for f in self.finished:
new_pop.append(self.pop[f].get(timeout = 1))
self.pop[f] = None
# Do some other stuff
When I run this code I get a cPickle.PicklingError
which states that a <type 'function'>
can't be pickled. What this tells me is that one of the apply_async
functions has not returned yet so I am attempting to append a running function to another list. But this shouldn't be happening because all running calls should have been filtered out using the ready()
function.
On a related note, the actual nature of the Item
class is unimportant but what is important is that at the top of my Item.test
function I have a print statement which is supposed to fire for debugging purposes. However, that does not occur. This tells me that that the function has been initiated but has not actually started execution.
So then, it appears that ready()
does not actually tell me whether or not a call has finished execution or not. What exactly does ready()
do and how should I edit my code so that I can filter out the processes that are still running?
Multiprocessing uses pickle
module internally to pass data between processes,
so your data must be picklable. See the list of what is considered picklable, object method is not in that list.
To solve this quickly just use a wrapper function around the method:
def wrap_item_test(item):
item.test()
class MyClass:
def gen_pop(self):
self.pop = [ self.pool.apply_async(wrap_item_test, (Item(),)) for _ in range(NUM_PROCESSES) ]
while (not self.check()):
continue