Search code examples
airflowdirected-acyclic-graphs

Airflow tasks only available on the first run


My use case is simple: I have some processes that interacts with some tables in the database. I want to automate the tables creation, so I added a create_table_if_not_exists_task. I only want to run that task in the first DAG run, but not in the following ones, since it's taking DAG time / resources that I could be using somewhere else.

My question is: do I have a clean way to do that in Airflow?

The idea I had is to have an Airflow variable updated with that information and check it in the DAG parsing. Don't like it since it creates a connection to the metadata database in each heartbeat.


Solution

  • You could use a ShortCircuitOperator (or even BranchPythonOperator depending on your pipeline) in front of the task you wish to control, access dag_run object (either directly or through context) in the Python callable, and then check for any previous DagRuns with the DagRun.get_previous_dagrun() method. Something like this (untested though):

    def has_previous_dagrun(dag_run):
        return False if dag_run.get_previous_dagrun() is not None else True
    
    ...
    
    check_if_first_dagrun = ShortCircuitOperator(
        task_id="check_if_first_dagrun",
        python_callable=has_previous_dagrun,
    )