Currently, my database has multi departments. I need to apply a data pipeline to all of these departments with different configurations.
I want to load configurations for each department from a database. Then use these configuration to generate a list of Jobs in Dagster.
For example, I have 3 tenants:
Department1: Configuration1
Department2: Configuration2
Department3: Configuration3
These information is stored in my database.
How can I load these information and dynamically create 3 jobs (pipelines):
Pipeline1 for Department1 with Configuration1
Pipeline2 for Department2 with Configuration2
Pipeline3 for Department3 with Configuration3
Is it possible to do it on Dagster? I can do it with Airflow (dynamically generating DAGs) but not sure how to do this in Dagster. I cannot load database configuration outside of op/job in Dagster.
In Dagster, your @repository function is just a regular function, so you can run arbitrary code in there to query your database and generate jobs dynamically:
@repository
def my_repo():
configs = # some query to your database
jobs = []
for config in configs:
jobs.append(get_job_for_my_config(config))
return jobs
If you expect that the database call might be somewhat expensive in terms of time, you can look into making your repository lazy-loaded, which the Dagster RepositoryDefinition docs detail how to do.