Search code examples
pythondagster

How to rewrite python script to dagster friendly code


I have this simple python script. How could I rewrite it in a way that works in dagster?

import logging

from mypackage import function1, function2, function3, function4, function5


def main():
    try:
        function1()
        function2()
    except Exception as e:
        logging.exception(e)
        function4()
    else:
        function5()

if __name__ == '__main__:
    main()

This is what I've been trying so far, but still long way to go:

import logging

from dagster import success_hook, failure_hook
from mypackage import function1, function2, function3, function4, function5


@solid
def dag_function1() -> bool:
    myvar1 = True
    function1()
    return myvar1


@solid
def dag_function2() -> bool:
    myvar2 = True
    function2()
    return myvar2


@solid
def dag_function3() -> bool:
    myvar3 = True
    function3()
    return myvar3


@failure_hook
def dag_function5():
    logging.exception('NOT SURE HOW TO ACCESS MY EXCEPTION')
    function5()


@success_hook
def dag_function4():
    function4()


def main():
    dag_function3(dag_function1(), dag_function2())

I have tried something like this, but dagster throws an error dagster.core.errors.DagsterInvariantViolationError: No jobs, pipelines, graphs, or repositories found


Solution

  • In order to pass outputs of solids to inputs of other solids, you'll need to create a pipeline that defines the dependencies between inputs and outputs.

    From there, you'll be able to execute the pipeline:

    import logging
    
    from dagster import success_hook, failure_hook, solid, pipeline, execute_pipeline
    
    from mypackage import function1, function2, function3, function4, function5
    
    
    @solid
    def dag_function1() -> bool:
        myvar1 = True
        function1()
        return myvar1
    
    
    @solid
    def dag_function2() -> bool:
        myvar2 = True
        function2()
        return myvar2
    
    
    @solid
    def dag_function3(input_1, input_2) -> bool:
        myvar3 = True
        function3()
        return myvar3
    
    
    @failure_hook
    def dag_function5(context):
        logging.exception(context.solid_exception)
        function5()
    
    
    @success_hook
    def dag_function4(context):
        pass
    
    
    @pipeline(hook_defs={dag_function5, dag_function4})
    def my_pipeline():
        dag_function3(dag_function1(), dag_function2())
    
    if __name__ == '__main__':
        execute_pipeline(my_pipeline)
    

    That being said, since 0.13.0 Dagster has migrated to a new set of core APIs (with jobs, ops, and graphs). A migration guide exists here that details how solids and pipelines map to ops and jobs.