Search code examples
pythonairflowdirected-acyclic-graphs

How to Stop an Airflow DAG and Skip Future Processing if File is Empty?


I am working on an Airflow DAG where I need to perform certain processing tasks on a file only if the file is not empty. The workflow should ideally check if the file has content, and if it's empty, the DAG should stop executing further and skip any future processing related to this file.

Here is the simplified structure of my Airflow DAG:

from google.cloud import storage
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

def check_file_not_empty():
    client = storage.Client()
    bucket = client.get_bucket(src_bucket_name)
    blob = bucket.get_blob(blob_name)
    if blob.size == 0:
        raise Exception(f"The file {blob_name} in bucket {src_bucket_name} is empty")

def process_file():
    # Code to process the file

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2024, 3, 21),
    'retries': 1,
}

dag = DAG('file_processing_dag', default_args=default_args, schedule_interval='@daily')

check_file_task = PythonOperator(
    task_id='check_file_not_empty',
    python_callable=check_file_not_empty,
    dag=dag,
)

process_file_task = PythonOperator(
    task_id='process_file',
    python_callable=process_file,
    dag=dag,
)

check_file_task >> process_file_task

It seems that I need to call some Airflow internal option to stop the execution, because the exception part only generates retries, and that is not something I need. I would like to fail fast instead. How can I do that?


Solution

  • you could use the Branch operator with an if/else condition for the file size or file being empty. Sample callable code:

    def my_branch_func(ti) -> None:
        if file_has_content :
            return 'process_file' 
        else:
            return 'noop_task'
    

    Or the ShortCircuit operator that you mentioned https://airflow.apache.org/docs/apache-airflow/stable/howto/operator/python.html#howto-operator-shortcircuitoperator