Search code examples
pythonmultiprocess

Dynamically add arguments to a Pool of workers


I'm looking a way to dynamically add arguments to a Pool of workers during the same iteration. So, in the case some of these fails I'm able to promptly re-process it.

from numpy import random
from multiprocessing import Pool
from time import sleep

def foo(x):
    sleep(0.1)
    # 50% chance to have a fault
    return x, x if random.rand() > 0.5 else -1

random.seed(3)      # seed
pool = Pool(2)      # process
args = range(5)     # arguments to process

for i,(id,x) in enumerate(pool.imap(foo, args)):
    print i,x
    if x != -1:
        args.remove(id)

print args

The output is

0 0
1 1
2 2
3 3
4 -1
[4]

but I'd like it to be

0 0
1 1
2 2
3 3
4 -1
5, 4
[]

within the same iteration. I meant, I don't want to create a new map to the same Pool of workers once the iteration is complete. I'd like to push new argument directly, so that it fails at the first iteration I don't have to wait till the end before using the available process! I hope it make sense...

Update: My problem above simplified, the "foo" function takes about 20 minutes to complete and it's spread across 24 process which run concurrently. As soon one process fails I need to re-processing as soon as possible, as I don't want to wait 20 minutes when I have available resources.


Solution

  • As far as I know, you can't add a task to a currently-running Pool (without creating a race condition or undefined behavior, as you're currently seeing). Luckily, since all you need to do is retry any failed tasks until successful completion, you don't actually need to add anything to the Pool. All you need to do is modify the mapped function to behave the way you want.

    def foo(x):
        sleep(0.1)
        # 50% chance to have a fault
        return x, x if random.rand() > 0.5 else -1
    
    def successful_foo(x):
        '''Version of the foo(x) function that never fails.'''
    
        result = -1
        while result == -1:
            result = foo(x)
        return result
    

    Now you can pool.imap(successful_foo, args), and be assured that every process will complete successfully (or run forever). If it's possible that it could run forever and you want an option to abort after some number of tries or some amount of time, just replace the while loop with an appropriate counter or timer.


    Of course, in many non-demo cases, having a special return value to indicate failure is impractical. In that situation, I prefer to use a specialized Exception to handle the sorts of predictable failures you might encounter:

    class FooError(BaseException):
        pass
    
    def foo(x):
        sleep(0.1)
        # 50% chance to have a fault
        if random.rand() > 0.5:  # fault condition
            raise FooError('foo had an error!')
        return x, x
    
    def successful_foo(x):
        '''Version of the foo(x) function that never fails.'''
    
        while True:
            try:
                return foo(x)
            except FooError as e:
                pass  # Log appropriately here; etc.