So my goal is to create a Dag with BigQueryOperators that I can send in Airflow with a parametrized destination table in my SQL. I checked a lot of topics about how to send parameters to PythonOperators in order to call them with a --conf in Airflow but I don't know how to apply the same way to an argument of a BigQueryOperators.
My dag.py looks like this :
import airflow
import blabla..
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
with DAG(
"TestPython",
schedule_interval=None,
default_args=default_args,
max_active_runs=1,
catchup=False,
) as dag:
stepOne = BigQueryOperator(
task_id="stepOne",
sql="SELECT * FROM `testTable` ",
destination_dataset_table=" **variableTable** ",
write_disposition="WRITE_TRUNCATE",
use_legacy_sql=False,
)
stepOne
I wanted to know if there is a way to set the destination table name with an airflow trigger_dag command or maybe something else ( and of course while having a default value when it is not set so it can still be uploaded in my Dag bucket )
If something is not clear, I am available for more details and ways I tried to do it.
Yes, you can pass a run-time value to the "destination_dataset_table" because it is a templated field.
For example:
my_suffix = "{{ macros.ds_format(macros.ds_add(ds, -2), "%Y-%m-%d", "%Y%m%d") }}"
stepOne = BigQueryOperator(
task_id="stepOne",
sql="SELECT * FROM `testTable` ",
destination_dataset_table=f"project_id.dataset_id.table_prefix_{my_suffix}",
write_disposition="WRITE_TRUNCATE",
use_legacy_sql=False,
)
In my example I make the table name change using an Airflow macro to manipulate dates but you can use many others like XCOM:
"{{ task_instance.xcom_pull(task_ids='task_id', key='return_value') }}"
For your specific use-case, I think that this answer should work.
You can pass parameters from the CLI using --conf '{"key":"value"}' and then use it in the DAG file as "{{ dag_run.conf["key"] }}" in templated field.