Search code examples
pythonpython-3.xpipelinedagster

Dagster: how to reexecute failed steps of a pipeline?


I created a test pipeline and it fails mid-way. I want to programmatically re-execute it but starting at the failed step of the pipeline and move forward. I do not want to repeat execution of the earlier, successful steps.

from dagster import DagsterInstance, execute_pipeline, pipeline, solid, reexecute_pipeline
from random import random

instance = DagsterInstance.ephemeral()

@solid
def step1(context, data):
   return range(10), ('a' + i for i in range(10))

@solid
def step2(context, step1op):
  x,y = step1op

  # simulation of noise
  xx = [el * (1 + 0.1 * random()) for el in x]
  xx2 = [(el - 1)/el for el in xx]
  return zip(xx, xx2), y

@solid
def step3(context, step2op):
   x, y = step2op

   ...
   return x, y


run_config = {...}


@pipeline
def inputs_pipeline():
     step3(step2(step1()))

Solution

  • Programmatical re-execution of part of the pipeline require identifying ID of a parent solid which is available:

    parent_run_id = instance.get_runs()[0].run_id
    

    Then reexution of the pipeline:

    result = reexecute_pipeline(inputs_pipeline, parent_run_id=parent_run_id,
                                step_keys_to_execute=['step2.compute', 'step3.compute'],
                                run_config=run_config, instance=instance)