Search code examples
pythonprefect

Prefect: How to imperatively create tasks based on a task list derived from a Parameter


I'm trying to define tasks imperatively based on a list. The challenge is that the list should be based on a Prefect parameter.

Below is the code I tried, but apparently it doesn't work as task_dependency_pairs is a task, not a list.

How do I make it work without breaking the dependency between the Parameter task, and the other dynamically generated tasks?

from prefect import task, Flow, Parameter, Task
import time

@task
def task_dependency_pairs(param):
    return [
    ('task 1', f'{param}A', ''),
    ('task 2', f'{param}B','task 1'),
    ('task 3', f'{param}C','task 1')]

class Task_class(Task):

    def run(self, **kwarg):   
        time.sleep(5)
        print(f"This task {kwarg['task_name']} does a lot of things with {kwarg.get('calc_value','')}.")

for task_name, dependency in task_dependency_pairs:
    globals()[task_name] = type(task_name, (Task_class,),{"__module__": __name__})

with Flow("my_process") as flow:

    param = Parameter("param", default="default_param")
    task_dependency_pairs_list = task_dependency_pairs(param)
    for task_name, calc_value, dependency in task_dependency_pairs_list:   # This won't work
        task_instance = globals()[task_name](name=task_name)
        flow.add_task(task_instance(task_name = task_name, calc_value = calc_value))

    for task_name, calc_value, dependency in task_dependency_pairs_list:  # This won't work
        if len(dependency) >0:
            flow.get_tasks(name=task_name)[0].set_upstream(flow.get_tasks(name=dependency)[0])

flow.visualize()

Solution

  • Trying to dynamically create tasks in a prefect Flow is best managed via the mapping functionality.

    However, the mapping simply will generate tasks from an iterable during the flow run. It doesn't arbitrarily tweak dependencies of the generated tasks; they all share the dependencies as defined on the main mapped task.

    However, if you wanted to generate a flow at runtime (with programmatic dependencies), the only way I can think of would be to create a task that creates a flow and immediately runs it.

    The way this might look for your flow is:

    ...
    
    @task
    def run_flow(inputs):
        with Flow("subflow") as sub_flow:
            for (name, calc_value, dependency) in inputs:
                inst = Task_class(name=name)(task_name=name, calc_value=calc_value)
                sub_flow.add_task(inst)
                if dependency:
                    inst.set_upstream(sub_flow.get_tasks(name=dependency)[0])
    
        sub_flow.run()
    
    with Flow("my_process") as flow:
        param = Parameter("param", default="default_param")
        task_dependency_pairs_list = task_dependency_pairs(param)
        run_flow(task_dependency_pairs_list)