I created a test pipeline and it fails mid-way. I want to programmatically re-execute it but starting at the failed step of the pipeline and move forward. I do not want to repeat execution of the earlier, successful steps.
from dagster import DagsterInstance, execute_pipeline, pipeline, solid, reexecute_pipeline
from random import random
instance = DagsterInstance.ephemeral()
@solid
def step1(context, data):
return range(10), ('a' + i for i in range(10))
@solid
def step2(context, step1op):
x,y = step1op
# simulation of noise
xx = [el * (1 + 0.1 * random()) for el in x]
xx2 = [(el - 1)/el for el in xx]
return zip(xx, xx2), y
@solid
def step3(context, step2op):
x, y = step2op
...
return x, y
run_config = {...}
@pipeline
def inputs_pipeline():
step3(step2(step1()))
Programmatical re-execution of part of the pipeline require identifying ID of a parent solid which is available:
parent_run_id = instance.get_runs()[0].run_id
Then reexution of the pipeline:
result = reexecute_pipeline(inputs_pipeline, parent_run_id=parent_run_id,
step_keys_to_execute=['step2.compute', 'step3.compute'],
run_config=run_config, instance=instance)