Search code examples
pythonworkflowprefect

Looping tasks in Prefect


I want to loop over tasks, again and again, until reaching a certain condition before continuing the rest of the workflow.

What I have so far is this:

# Loop task
class MyLoop(Task):
    def run(self):
        loop_res = prefect.context.get("task_loop_result", 1)
        print (loop_res)
        if loop_res >= 10:
            return loop_res
        raise LOOP(result=loop_res+1)

But as far as I understand this does not work for multiple tasks. Is there a way to come back further and loop on several tasks at a time ?


Solution

  • The solution is simply to create a single task that itself creates a new flow with one or more parameters and calls flow.run(). For example:

    class MultipleTaskLoop(Task):
        def run(self):
            # Get previous value
            loop_res = prefect.context.get("task_loop_result", 1)
            
            # Create subflow
            with Flow('Subflow', executor=LocalDaskExecutor()) as flow:
                x = Parameter('x', default = 1)
                loop1 = print_loop()
                add = add_value(x)
                loop2 = print_loop()
                loop1.set_downstream(add)
                add.set_downstream(loop2)
    
            # Run subflow and extract result
            subflow_res = flow.run(parameters={'x': loop_res})
            new_res = subflow_res.result[add]._result.value
    
            # Loop
            if new_res >= 10:
                return new_res
            raise LOOP(result=new_res)
    

    where print_loop simply prints "loop" in the output and add_value adds one to the value it receives.