Search code examples
dagster

Is it possible to create dynamic jobs with Dagster?


Consider this example - you need to load table1 from source database, do some generic transformations (like convert time zones for timestamped columns) and write resulting data into Snowflake. This is an easy one and can be implemented using 3 dagster ops.

Now, imagine you need to do the same thing but with 100s of tables. How would you do it with dagster? Do you literally need to create 100 jobs/graphs? Or can you create one job, that will be executed 100 times? Can you throttle how many of these jobs will run at the same time?


Solution

  • You have a two main options for doing this:

    1. Use a single job with Dynamic Outputs:

    With this setup, all of your ETLs would happen in a single job. You would have an initial op that would yield a DynamicOutput for each table name that you wanted to do this process for, and feed that into a set of ops (probably organized into a graph) that would be run on each individual DynamicOutput.

    Depending on what executor you're using, it's possible to limit the overall step concurrency (for example, the default multiprocess_executor supports this option).

    1. Create a configurable job (I think this is more likely what you want)
        from dagster import job, op, graph
        import pandas as pd
        
        
        @op(config_schema={"table_name": str})
        def extract_table(context) -> pd.DataFrame:
            table_name = context.op_config["table_name"]
            # do some load...
            return pd.DataFrame()
        
        
        @op
        def transform_table(table: pd.DataFrame) -> pd.DataFrame:
            # do some transform...
            return table
        
        
        @op(config_schema={"table_name": str})
        def load_table(context, table: pd.DataFrame):
            table_name = context.op_config["table_name"]
            # load to snowflake...
        
        
        @job
        def configurable_etl():
            load_table(transform_table(extract_table()))
        
        # this is what the configuration would look like to extract from table
        # src_foo and load into table dest_foo
        configurable_etl.execute_in_process(
            run_config={
                "ops": {
                    "extract_table": {"config": {"table_name": "src_foo"}},
                    "load_table": {"config": {"table_name": "dest_foo"}},
                }
            }
        )
    

    Here, you create a job that can be pointed at a source table and a destination table by giving the relevant ops a config schema. Depending on those config options, (which are provided when you create a run through the run config), your job will operate on different source / destination tables.

    The example shows explicitly running this job using python APIs, but if you're running it from Dagit, you'll also be able to input the yaml version of this config there. If you want to simplify the config schema (as it's pretty nested as shown), you can always create a Config Mapping to make the interface nicer :)

    From here, you can limit run concurrency by supplying a unique tag to your job, and using a QueuedRunCoordinator to limit the maximum number of concurrent runs for that tag.