I'm looking a way to dynamically add arguments to a Pool of workers during the same iteration. So, in the case some of these fails I'm able to promptly re-process it.
from numpy import random
from multiprocessing import Pool
from time import sleep
def foo(x):
sleep(0.1)
# 50% chance to have a fault
return x, x if random.rand() > 0.5 else -1
random.seed(3) # seed
pool = Pool(2) # process
args = range(5) # arguments to process
for i,(id,x) in enumerate(pool.imap(foo, args)):
print i,x
if x != -1:
args.remove(id)
print args
The output is
0 0
1 1
2 2
3 3
4 -1
[4]
but I'd like it to be
0 0
1 1
2 2
3 3
4 -1
5, 4
[]
within the same iteration. I meant, I don't want to create a new map to the same Pool of workers once the iteration is complete. I'd like to push new argument directly, so that it fails at the first iteration I don't have to wait till the end before using the available process! I hope it make sense...
Update: My problem above simplified, the "foo" function takes about 20 minutes to complete and it's spread across 24 process which run concurrently. As soon one process fails I need to re-processing as soon as possible, as I don't want to wait 20 minutes when I have available resources.
As far as I know, you can't add a task to a currently-running Pool
(without creating a race condition or undefined behavior, as you're currently seeing). Luckily, since all you need to do is retry any failed tasks until successful completion, you don't actually need to add anything to the Pool
. All you need to do is modify the mapped function to behave the way you want.
def foo(x):
sleep(0.1)
# 50% chance to have a fault
return x, x if random.rand() > 0.5 else -1
def successful_foo(x):
'''Version of the foo(x) function that never fails.'''
result = -1
while result == -1:
result = foo(x)
return result
Now you can pool.imap(successful_foo, args)
, and be assured that every process will complete successfully (or run forever). If it's possible that it could run forever and you want an option to abort after some number of tries or some amount of time, just replace the while
loop with an appropriate counter or timer.
Of course, in many non-demo cases, having a special return value to indicate failure is impractical. In that situation, I prefer to use a specialized Exception
to handle the sorts of predictable failures you might encounter:
class FooError(BaseException):
pass
def foo(x):
sleep(0.1)
# 50% chance to have a fault
if random.rand() > 0.5: # fault condition
raise FooError('foo had an error!')
return x, x
def successful_foo(x):
'''Version of the foo(x) function that never fails.'''
while True:
try:
return foo(x)
except FooError as e:
pass # Log appropriately here; etc.