Search code examples
python-3.xcelerycelery-task

How can I create a chain of conditional subtasks in Celery?


I'm creating an application that will create a chain of tasks to perform, but the chain will have perform the tasks contingent on the user's desire for that piece to be added.

For example, if the user wants to start_boo the chain might be:

def start_boo():
    chain = start_foo.s() | start_bar.s() | start_baz.s()
    chain()

However, if foo and baz have already been started, we would not want to do this; instead preferring something like:

def start_boo(foo=True, bar=True, baz=True):
    if not (foo or bar or baz):
        raise Exception("At least one should be true...")
    chain = None
    if foo:
       chain |= start_foo.s()
    if bar:
        chain |= start_bar.s()
    if baz:
        chain |= start_baz.s()
    chain()

start_boo(foo=False, baz=False)

However, this won't work for various reasons.

Is there an idiom for doing something like this?


Solution

  • The idiom is the reduce function from functools. You can do the following:

    def start_boo(foo=True, bar=True, baz=True):
        if not (foo or bar or baz):
            raise Exception("At least one should be true...")
    
        todo_tasks = [foo, bar, baz]
        start_tasks = [start_foo, start_bar, start_baz]
    
        # tasks contains start tasks which should be done per the options.
        # if it's False in todo_tasks, its associated start_task isn't added
        tasks = [start_task for todo, start_task in zip(todo_tasks, start_tasks) if todo]
        first_task, rest = *tasks  
    
        # start with the first task to be completed and chain it with remaining tasks
        chain = functools.reduce(lambda x, y: x | y.s(), rest, first_task.s())
        chain()