I am trying to create Dynamic DAGs and then get them to the scheduler. I tried the reference from https://www.astronomer.io/guides/dynamically-generating-dags/ which works well. I changed it a bit as in the below code. Need help in debugging the issue.
I tried 1. Test run the file. The Dag gets executed and the globals() is printing all the DAGs objects. But somehow not listing in the list_dags or in the UI
from datetime import datetime, timedelta
import requests
import json
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.http_operator import SimpleHttpOperator
def create_dag(dag_id,
dag_number,
default_args):
def hello_world_py(*args):
print('Hello World')
print('This is DAG: {}'.format(str(dag_number)))
dag = DAG(dag_id,
schedule_interval="@hourly",
default_args=default_args)
with dag:
t1 = PythonOperator(
task_id='hello_world',
python_callable=hello_world_py,
dag_number=dag_number)
return dag
def fetch_new_dags(**kwargs):
for n in range(1, 10):
print("=====================START=========\n")
dag_id = "abcd_" + str(n)
print (dag_id)
print("\n")
globals()[dag_id] = create_dag(dag_id, n, default_args)
print(globals())
default_args = {
'owner': 'diablo_admin',
'depends_on_past': False,
'start_date': datetime(2019, 8, 8),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=1),
'trigger_rule': 'none_skipped'
#'schedule_interval': '0 * * * *'
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG('testDynDags', default_args=default_args, schedule_interval='*/1 * * * *')
#schedule_interval='*/1 * * * *'
check_for_dags = PythonOperator(dag=dag,
task_id='tst_dyn_dag',
provide_context=True,
python_callable=fetch_new_dags
)
check_for_dags
Expected to create 10 DAGs dynamically and added to the scheduler.
I guess doing the following would fix it
testDynDags
dag and tst_dyn_dags
task (instantiation and invocation)fetch_new_dags(..)
method with requisite arguments in global scopeExplanation
So clearly, wrapping dag-generation code inside an Airflow task itself makes no sense.
UPDATE-1
From what is indicated in comments, I infer that the requirement dictates that you revise your external source that feeds inputs (how many dags or tasks to create) to your DAG / task-generation script. While this is indeed a complex use-case, but a simple way to achieve this is to create 2 separate DAGs.
You can take inspiration from the Adding DAGs based on Variable value
section