Search code examples
pythonprefect

How to resume a Prefect flow on failure without having to re-run the entire flow?


TL;DR;

I wasn't able to use prefect's FlowRunner to solve the above question. I likely either used it wrong (see below) or missed something. Would really appreciate any pointers!


The Problem

I read through the fantastic prefect core documentation and found the sections on Handling Failure and Local Debugging to be the most relevant to this (may have missed something!). The FlowRunner class appeared (to me) to be the solution.

To see if I could use Flow Runner to resume a failed flow:

  • Made a failing flow run:
from time import sleep

import prefect
from prefect import Flow, task


@task
def success():
    sleep(3)
    return


@task
def failure():
    return 1 / 0


def get_flow_runner():
    with Flow("Success/Failure") as flow:

        success()
        failure()

    return prefect.engine.FlowRunner(flow)
  • Ran it in iPython and saved the state:
In [1]: run nameofscript.py
In [2]: flow_runner = get_flow_runner()
In [3]: state = flow_runner.run()
  • Replaced 1 / 0 with 1 / 1 in failure() so task would be successful:

  • And finally passed the previous state to the flow_runner hoping that it would resume the flow:

In [1]: run nameofscript.py
In [2]: flow_runner = get_flow_runner()
In [3]: flow_runner.run(task_states=state.result)

The entire flow ran again including the 3 second successful task.


Solution

  • The issue here is that you are rebuilding your Flow with each run, which changes the Task objects. state.result is a dictionary whose keys are Task objects - if the underlying Task object changes in any way, so will its hash. You should instead manually create the dictionary of states using the updated Task objects, like so:

    from prefect.engine.state import Success
    
    failure_task = runner.flow.get_tasks(name="failure")[0]
    task_states = {failure_task: Success("Mocked success")}