Search code examples
airflowjinja2google-cloud-composer

How can I dynamically change a bucket name in an Airflow DAG?


I have been using Airflow successfully to read a SQL file from a bucket and pass it to a BigQueryInsertJobOperator. Here is a minimal example to show my currently working setup:

with DAG(
    "my_dag",
    blah_args,
) as dag:

def get_query(file_name):
    with open("/home/airflow/gcs/data/my_sql_file_bucket/" + file_name) as f:
        f_query = f.read()
        return f_query

op1 = BigQueryInsertJobOperator(
    blah_args=blah_args,
    configuration={
        "query":{
            "query": get_query("my_sql_file.sql"),
            blah_all_the_rest
        },
    }
    )

Ok hopefully you get this idea, this works perfectly! But now I need to dynamically change the bucket I read from based on a variable but I cannot figure out how to get it working:

def get_query(file_name):
    with open("/home/airflow/gcs/data/{{ var.value.my_dynamic_bucket }}" + file_name) as f:
        f_query = f.read()
        return f_query

doesn't render and I understand why, it needs to be in a task. So then I tried:

op1 = BigQueryInsertJobOperator(
    blah_args=blah_args,
    configuration={
        "query":{
            "query": open({{  var.value.my_dynamic_bucket  }}).read(),
            blah_all_the_rest
        },
    }
    )

but I suppose this is the same error as before. I even tried setting the template_searchpath to have the value of {{ var.value.my_dynamic_bucket }} in the default_args but it won't render here either.

How can I use an Airflow variable to determine the name of the bucket I need to read my SQL files from? Thank you in advance.


Solution

  • if using a global scope variable is OK with your DAG then you can try using Airflow Variables https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/variables.html

    Define a Variable say bucket_name with relevant value.

    from airflow.models import Variable
    
    
    # inside your function 
    Variable.get("bucket_name")
    

    you can also update the Variable value if needed from say another DAG or just using the UI.

    Variable.update("bucket_name","new_bucket")