I need to execute multiple queries through single BigQuery operator inside my DAG which is implemented through Google Composer. I will have many Operators in the DAG and each Operator needs to execute multiple queries sequentially. I tried giving a list of queries to BigQueryInsertJobOperator but it executes only the first query in the list and ignores rest. It also does not give any error. I am trying with BigQueryInsertJobOperator.
I can not use the trick to combine all the queries in a single string as my 2nd or 3rd queries can have Declare statements which will fail if I combine multiple queries in a single string variable. Also, if I run the Operator in loop then how can I set the dependency as I will have many operators and each operator will have multiple queries.
You can create a DAG
with the following structure :
project
dag_folder
queries
object1
query.sql
object2
query.sql
And then in your DAG, you can use this kind of code :
import os
import airflow
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from jinja2 import Template
def get_jinja_template(file_path: str) -> Template:
with open(file_path) as fp:
return Template(fp.read())
with airflow.DAG(
"your_dag",
default_args=your_args,
schedule_interval=None) as dag:
# You can configure this list for your DAG, for example with Airflow variables or elsewhere of
# you prefer
your_objects = [
'object1',
'object2'
]
dag_folder = os.getenv("DAGS_FOLDER")
query_operators = []
for obj in your_objects:
query_path = f'{dag_folder}/your_dag_folder/{obj}/query.sql'
query = get_jinja_template(query_path).render(
project_id='project',
param2='param2'
)
execute_query = BigQueryInsertJobOperator(
task_id='truncate_team_stat_staging_table',
configuration={
"query": {
"query": query,
"useLegacySql": False,
}
},
location='EU'
)
query_operators.append(execute_query)
chain(*query_operators)
Each query file can contains multiple queries, example :
SELECT ... FROM `{{project_id}}.dataset.table where element = '{{param2}}'....;
SELECT ... FROM table2 where ....;
queries
folderDAG
, I configured this list of objectDAG
structure and queries
folder : queries/object/query.sql
Jinja2
template and paramsBigQueryInsertJobOperator
BigQueryInsertJobOperator
DAG
will execute each BigQueryInsertJobOperator
operator and queries by object sequentially via the chain
operatorI hope this solution will help.