I have two different DAGs that need to run in different frequencies. One i.e. dag1
needs to run weekly and the dag2
needs to run daily. Now dag2
should only run when dag1 has finished, on every occurrence when dag1
runs.
I have defined two DAGs as follows in two different python modules.
PROJECT_PATH = path.abspath(path.join(path.dirname(__file__), '../..'))
with DAG('dag1',
default_args={
'owner': 'airflow',
'start_date': dt.datetime(2019, 8, 19, 9, 30, 00),
'concurrency': 1,
'retries': 0
}
schedule_interval='00 10 * * 1',
catchup=True
) as dag:
CRAWL_PARAMS = BashOperator(
task_id='crawl_params',
bash_command='cd {}/scraper && scrapy crawl crawl_params'.format(PROJECT_PATH)
)
PROJECT_PATH = path.abspath(path.join(path.dirname(__file__), '../..'))
with DAG('dag2',
default_args = {
'owner': 'airflow',
'start_date': dt.datetime(2019, 8, 25, 9, 30, 00),
'concurrency': 1,
'retries': 0
}
schedule_interval='5 10 * * *',
catchup=True
) as dag:
CRAWL_DATASET = BashOperator(
task_id='crawl_dataset',
bash_command='''
cd {}/scraper && scrapy crawl crawl_dataset
'''.format(PROJECT_PATH)
)
Currently I have manually set a gap of 5 minutes between two dags. This setup is not working currently and also lacks the function to make dag2
dependent on dag1
as required.
I had checked the answers here and here but was not able to figure out.
NOTE: the schedule_intervals
are indicative only. The intention is to run dag1
every Monday at a fixed time and run dag2
daily on a fixed time and on Monday, it should only after dag1
finishes.
Here each dag has multiple tasks as well.
After a lot of struggle with understanding the flow, I finally came up with the answer myself (not sure how optimal it is but works for me currently). Thanks to this answer and branching docs. Here is my solution using BranchPythonOperator.
import datetime as dt
from os import path
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import BranchPythonOperator
PROJECT_PATH = path.abspath(path.join(path.dirname(__file__), '../..'))
DEFAULT_ARGS = {
'owner': 'airflow',
'start_date': dt.datetime(2019, 8, 20),
'concurrency': 1,
'retries': 0
}
def branch_tasks(execution_date, **kwargs):
'''
Branch the tasks based on weekday.
'''
# check if the execution day is 'Saturday'
if execution_date.weekday() == 5:
return ['crawl_params', 'crawl_dataset']
return 'crawl_dataset'
with DAG('dag1',
default_args=DEFAULT_ARGS,
schedule_interval='00 10 * * *',
catchup=False
) as dag:
CRAWL_PARAMS = BashOperator(
task_id='crawl_params',
bash_command='cd {}/scraper && scrapy crawl crawl_params'.format(PROJECT_PATH)
)
CRAWL_DATASET = BashOperator(
task_id='crawl_dataset',
bash_command='cd {}/scraper && scrapy crawl crawl_dataset'.format(PROJECT_PATH),
trigger_rule='none_failed'
)
BRANCH_OP = BranchPythonOperator(
task_id='branch_tasks',
provide_context=True,
python_callable=branch_tasks,
dag=dag
)
BRANCH_OP.set_downstream([CRAWL_PARAMS, CRAWL_DATASET])
CRAWL_PARAMS.set_downstream(CRAWL_DATASET)
Here, the BranchPythonOperator uses the branch_tasks function to choose which tasks to run based on what day of week it is.
Another catch here is when crawl_params
do run when the condition is true for it, the downstreams will also run but when it is skipped, it's downstreams will also be skipped. To avoid this, we need to pass the trigger_rule='none_failed'
to the operator of the task. Which means the task should run if none of the tasks upstream have failed (they either succeeded or were skipped).