I have this simple python script. How could I rewrite it in a way that works in dagster?
import logging
from mypackage import function1, function2, function3, function4, function5
def main():
try:
function1()
function2()
except Exception as e:
logging.exception(e)
function4()
else:
function5()
if __name__ == '__main__:
main()
This is what I've been trying so far, but still long way to go:
import logging
from dagster import success_hook, failure_hook
from mypackage import function1, function2, function3, function4, function5
@solid
def dag_function1() -> bool:
myvar1 = True
function1()
return myvar1
@solid
def dag_function2() -> bool:
myvar2 = True
function2()
return myvar2
@solid
def dag_function3() -> bool:
myvar3 = True
function3()
return myvar3
@failure_hook
def dag_function5():
logging.exception('NOT SURE HOW TO ACCESS MY EXCEPTION')
function5()
@success_hook
def dag_function4():
function4()
def main():
dag_function3(dag_function1(), dag_function2())
I have tried something like this, but dagster throws an error dagster.core.errors.DagsterInvariantViolationError: No jobs, pipelines, graphs, or repositories found
In order to pass outputs of solids to inputs of other solids, you'll need to create a pipeline that defines the dependencies between inputs and outputs.
From there, you'll be able to execute the pipeline:
import logging
from dagster import success_hook, failure_hook, solid, pipeline, execute_pipeline
from mypackage import function1, function2, function3, function4, function5
@solid
def dag_function1() -> bool:
myvar1 = True
function1()
return myvar1
@solid
def dag_function2() -> bool:
myvar2 = True
function2()
return myvar2
@solid
def dag_function3(input_1, input_2) -> bool:
myvar3 = True
function3()
return myvar3
@failure_hook
def dag_function5(context):
logging.exception(context.solid_exception)
function5()
@success_hook
def dag_function4(context):
pass
@pipeline(hook_defs={dag_function5, dag_function4})
def my_pipeline():
dag_function3(dag_function1(), dag_function2())
if __name__ == '__main__':
execute_pipeline(my_pipeline)
That being said, since 0.13.0 Dagster has migrated to a new set of core APIs (with jobs, ops, and graphs). A migration guide exists here that details how solids and pipelines map to ops and jobs.