Search code examples
dagster

Is it possible to generate jobs in Dagster dynamically using configuration from database


Currently, my database has multi departments. I need to apply a data pipeline to all of these departments with different configurations.

I want to load configurations for each department from a database. Then use these configuration to generate a list of Jobs in Dagster.

For example, I have 3 tenants:

Department1: Configuration1

Department2: Configuration2

Department3: Configuration3

These information is stored in my database.

How can I load these information and dynamically create 3 jobs (pipelines):

Pipeline1 for Department1 with Configuration1

Pipeline2 for Department2 with Configuration2

Pipeline3 for Department3 with Configuration3

Is it possible to do it on Dagster? I can do it with Airflow (dynamically generating DAGs) but not sure how to do this in Dagster. I cannot load database configuration outside of op/job in Dagster.


Solution

  • In Dagster, your @repository function is just a regular function, so you can run arbitrary code in there to query your database and generate jobs dynamically:

    @repository
    def my_repo():
        configs = # some query to your database
        jobs = []
        for config in configs:
             jobs.append(get_job_for_my_config(config))
        return jobs
    

    If you expect that the database call might be somewhat expensive in terms of time, you can look into making your repository lazy-loaded, which the Dagster RepositoryDefinition docs detail how to do.