I wasn't able to use prefect's FlowRunner to solve the above question. I likely either used it wrong (see below) or missed something. Would really appreciate any pointers!
I read through the fantastic prefect core documentation and found the sections on Handling Failure and Local Debugging to be the most relevant to this (may have missed something!). The FlowRunner class appeared (to me) to be the solution.
To see if I could use Flow Runner to resume a failed flow:
from time import sleep
import prefect
from prefect import Flow, task
@task
def success():
sleep(3)
return
@task
def failure():
return 1 / 0
def get_flow_runner():
with Flow("Success/Failure") as flow:
success()
failure()
return prefect.engine.FlowRunner(flow)
In [1]: run nameofscript.py
In [2]: flow_runner = get_flow_runner()
In [3]: state = flow_runner.run()
Replaced 1 / 0 with 1 / 1 in failure()
so task would be successful:
And finally passed the previous state to the flow_runner
hoping that it would resume the flow:
In [1]: run nameofscript.py
In [2]: flow_runner = get_flow_runner()
In [3]: flow_runner.run(task_states=state.result)
The entire flow ran again including the 3 second successful task.
The issue here is that you are rebuilding your Flow with each run, which changes the Task objects. state.result
is a dictionary whose keys are Task objects - if the underlying Task object changes in any way, so will its hash. You should instead manually create the dictionary of states using the updated Task objects, like so:
from prefect.engine.state import Success
failure_task = runner.flow.get_tasks(name="failure")[0]
task_states = {failure_task: Success("Mocked success")}