Search code examples
airflowairflow-taskflow

How to dynamic task mapping multiple parameters as pair, not cross product using taskflow api?


I would like to have parameter unpack in pair, set (if there is more than 2), not cross product.

@task
def add(x: int, y: int):
    return x + y


added_values = add.expand(x=[4, 8], y=[40, 80])
# I need
# add(x=4, y=40)
# add(x=8, y=80)

# I dont need
# add(x=4, y=80)
# add(x=8, y=40) 

Solution

  • You could instead take a single parameter, and pass x and y zipped together:

        @task
        def add(xy):
            return xy
    
        added_values = add.expand(xy=list(zip([4, 8], [40, 80])))
    

    This would also work for 3+ parameters, for example:

    added_values = add.expand(xy=list(zip([4, 8], [40, 80], [400, 800])))
    

    Taskflow also supports zipping the output of tasks to achieve this without hardcoding.