Search code examples
dagster

How to avoid running the rest of a dagster pipeline under certain conditions


say I have two solids in Dagster connected on a pipeline. The first solid may do some process and generate a valid input so that the rest of the pipeline executes, or generate an invalid input that should not be further processed. To achieve this result, I raise an error when the data meets the invalid condition so the pipeline stops and the rest of the solids are skipped.

Raising an error to solve my use case seems hacky, is there a way that I can skip the execution of the rest of the pipeline without resorting to exceptions?

from dagster import solid, pipeline

@solid
def solid_1(context, x: int):
    y = x + 1

    if y%2 == 0:
        raise "No even number is further processed"

    return y

@solid
def solid_2(context, y:int):
    return y**2

@pipeline
def toy_pipeline():
    solid_2(solid_1())

In this quite contrived example, the solid 2 should only be executed when the output from the first solid is odd.

In my actual use case, the first solid polls a DB and sometimes finds no data to process. In this case it makes sense not to mark the execution as failed but rather as a success. It could be possible to check in each downstream solid whether the data meets the conditions, yet this quickly adds boilerplate. It would be ideal to have a way to skip the execution of all downstream solids when the solid that receives the data finds no data to process.


Solution

  • To achieve the behavior you want, the output can be marked optional using the is_required=False argument on the corresponding OutputDefinition. This means that the output does not necessarily have to be yielded by the solid.

    If an optional output is not yielded, all downstream solids that depend on the output will simply skip. This is useful for both short-circuiting a pipeline, which is your use case, or for more complicated branching logic. Pipeline runs are not marked as failed when solids are skipped.

    You were using type-hints to define input and output types, but since you need to specify the is_required argument, you need to use an explicit OuputDefinition.

    from dagster import pipeline, solid, RepositoryDefinition, InputDefinition, OutputDefinition, Output
    from typing import List
    
    def query_db():
        return []
    
    @solid(output_defs=[OutputDefinition(List[int], 'data', is_required=False)])
    def solid_1(context):
        rows = query_db()
    
        if len(rows) > 0:
            yield Output(rows, output_name="data")
    
    
    @solid
    def solid_2(context, data: List[int]):
        context.log.info(str(data))
        pass
    
    
    @pipeline
    def my_pipeline():
        solid_2(solid_1())
    

    The solid solid_2 can also be defined using an InputDefinition instead of type hints. The type hints are syntatic sugar for InputDefinitions:

    @solid(input_defs=[InputDefinition('data', List[int])])
    def solid_2(context, data):
        context.log.info(str(data))
        # Process data
        pass
    

    As a side note: In general, exceptions are the correct way to mark a solid as failed, and are not considered hacky in Dagster code.