Search code examples
pythonairflowschedulingdirected-acyclic-graphs

How to schedule two DAGs to run in two different schedule_intervals but the second only run after the first has finished


I have two different DAGs that need to run in different frequencies. One i.e. dag1 needs to run weekly and the dag2 needs to run daily. Now dag2 should only run when dag1 has finished, on every occurrence when dag1 runs.

I have defined two DAGs as follows in two different python modules.

dag1.py

PROJECT_PATH = path.abspath(path.join(path.dirname(__file__), '../..'))

with DAG('dag1',
     default_args={
         'owner': 'airflow',
         'start_date': dt.datetime(2019, 8, 19, 9, 30, 00),
         'concurrency': 1,
         'retries': 0
     }
     schedule_interval='00 10 * * 1',
     catchup=True
    ) as dag:

CRAWL_PARAMS = BashOperator(
    task_id='crawl_params',
    bash_command='cd {}/scraper && scrapy crawl crawl_params'.format(PROJECT_PATH)
)

dag2.py

PROJECT_PATH = path.abspath(path.join(path.dirname(__file__), '../..'))

with DAG('dag2',
     default_args = {
         'owner': 'airflow',
         'start_date': dt.datetime(2019, 8, 25, 9, 30, 00),
         'concurrency': 1,
         'retries': 0
     }
     schedule_interval='5 10 * * *',
     catchup=True
    ) as dag:

CRAWL_DATASET = BashOperator(
    task_id='crawl_dataset',
    bash_command='''
        cd {}/scraper && scrapy crawl crawl_dataset
    '''.format(PROJECT_PATH)
)

Currently I have manually set a gap of 5 minutes between two dags. This setup is not working currently and also lacks the function to make dag2 dependent on dag1 as required.

I had checked the answers here and here but was not able to figure out.

NOTE: the schedule_intervals are indicative only. The intention is to run dag1 every Monday at a fixed time and run dag2 daily on a fixed time and on Monday, it should only after dag1 finishes. Here each dag has multiple tasks as well.


Solution

  • After a lot of struggle with understanding the flow, I finally came up with the answer myself (not sure how optimal it is but works for me currently). Thanks to this answer and branching docs. Here is my solution using BranchPythonOperator.

    dag1.py

    import datetime as dt
    from os import path
    
    from airflow import DAG
    from airflow.operators.bash_operator import BashOperator
    from airflow.operators.python_operator import BranchPythonOperator
    
    PROJECT_PATH = path.abspath(path.join(path.dirname(__file__), '../..'))
    
    DEFAULT_ARGS = {
        'owner': 'airflow',
        'start_date': dt.datetime(2019, 8, 20),
        'concurrency': 1,
        'retries': 0
    }
    
    def branch_tasks(execution_date, **kwargs):
        '''
        Branch the tasks based on weekday.
        '''
        # check if the execution day is 'Saturday'
        if execution_date.weekday() == 5:
            return ['crawl_params', 'crawl_dataset']
    
        return 'crawl_dataset'
    
    with DAG('dag1',
             default_args=DEFAULT_ARGS,
             schedule_interval='00 10 * * *',
             catchup=False
            ) as dag:
    
        CRAWL_PARAMS = BashOperator(
            task_id='crawl_params',
            bash_command='cd {}/scraper && scrapy crawl crawl_params'.format(PROJECT_PATH)
        )
    
        CRAWL_DATASET = BashOperator(
            task_id='crawl_dataset',
            bash_command='cd {}/scraper && scrapy crawl crawl_dataset'.format(PROJECT_PATH),
            trigger_rule='none_failed'
        )
    
    BRANCH_OP = BranchPythonOperator(
        task_id='branch_tasks',
        provide_context=True,
        python_callable=branch_tasks,
        dag=dag
    )
    
    BRANCH_OP.set_downstream([CRAWL_PARAMS, CRAWL_DATASET])
    CRAWL_PARAMS.set_downstream(CRAWL_DATASET)
    

    Here, the BranchPythonOperator uses the branch_tasks function to choose which tasks to run based on what day of week it is.
    Another catch here is when crawl_params do run when the condition is true for it, the downstreams will also run but when it is skipped, it's downstreams will also be skipped. To avoid this, we need to pass the trigger_rule='none_failed' to the operator of the task. Which means the task should run if none of the tasks upstream have failed (they either succeeded or were skipped).