Search code examples
airflowairflow-scheduler

Airflow ExternalTaskSensor with different scheduler interval


Currently I have two DAGs: DAG_A and DAG_B. Both runs with schedule_interval=timedelta(days=1)

DAG_A has a Task1 which usually takes 7 hours to run. And DAG_B only takes 3 hours.

DAG_B has a ExternalTaskSensor(external_dag_id="DAG_A", external_task_id="Task1") but also uses some other information X that is generated hourly.

What is the best way to increase the frequency of DAG_B so that it runs at least 4 times a day? As far as I know, both DAGs must have the same schedule_interval. However, I want to update X on DAG_B as much as I can.


One possibility is to create another DAG that has a ExternalTaskSensor for DAG_B. But I don't think it's the best way.


Solution

  • If I understood you correctly, your conditions are:

    • Keep running DAG_A daily
    • Run DAG_B n times a day
    • Every time DAG_B runs it will wait for DAG_A__Task_1 to be completed

    I think you could easily adapt your current design by instructing ExternalTaskSensor to wait for the desired execution date of DAG_A.

    From the ExternalTaskSensor operator defnition:

    Waits for a different DAG or a task in a different DAG to complete for a specific execution_date

    That execution_date could be defined using execution_date_fn parameter:

    execution_date_fn (Optional[Callable]) – function that receives the current execution date as the first positional argument and optionally any number of keyword arguments available in the context dictionary, and returns the desired execution dates to query. Either execution_delta or execution_date_fn can be passed to ExternalTaskSensor, but not both.

    You could define the sensor like this:

        wait_for_dag_a = ExternalTaskSensor(
            task_id='wait_for_dag_a',
            external_task_id="external_task_1",
            external_dag_id='dag_a_id',
            allowed_states=['success', 'failed'],
            execution_date_fn=_get_execution_date_of_dag_a,
            poke_interval=30
        )
    

    Where _get_execution_date_of_dag_a performs a query to the DB using get_last_dagrun allowing you to get the last execution_date of DAG_A.

    from airflow.utils.db import provide_session
    from airflow.models.dag import get_last_dagrun
    
    @provide_session
    def _get_execution_date_of_dag_a(exec_date, session=None,  **kwargs):
        dag_a_last_run = get_last_dagrun(
            'dag_a_id', session)
        return dag_a_last_run.execution_date
    

    I hope this approach helps you out. You can find a working example in this answer.