Search code examples
pythongoogle-bigqueryairflowdirected-acyclic-graphsgoogle-cloud-composer

How to execute multiple queries through BigQuery Operators in Apache Airflow DAG through Google Composer?


I need to execute multiple queries through single BigQuery operator inside my DAG which is implemented through Google Composer. I will have many Operators in the DAG and each Operator needs to execute multiple queries sequentially. I tried giving a list of queries to BigQueryInsertJobOperator but it executes only the first query in the list and ignores rest. It also does not give any error. I am trying with BigQueryInsertJobOperator.

I can not use the trick to combine all the queries in a single string as my 2nd or 3rd queries can have Declare statements which will fail if I combine multiple queries in a single string variable. Also, if I run the Operator in loop then how can I set the dependency as I will have many operators and each operator will have multiple queries.


Solution

  • You can create a DAG with the following structure :

    project
         dag_folder
              queries
                  object1 
                        query.sql
                  object2  
                        query.sql
    

    And then in your DAG, you can use this kind of code :

    import os
    
    import airflow
    from airflow.models.baseoperator import chain
    from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
    from jinja2 import Template
    
    
    def get_jinja_template(file_path: str) -> Template:
        with open(file_path) as fp:
            return Template(fp.read())
    
    
    with airflow.DAG(
            "your_dag",
            default_args=your_args,
            schedule_interval=None) as dag:
        # You can configure this list for your DAG, for example with Airflow variables or elsewhere of
        # you prefer
        your_objects = [
            'object1',
            'object2'
        ]
    
        dag_folder = os.getenv("DAGS_FOLDER")
    
        query_operators = []
        for obj in your_objects:
            query_path = f'{dag_folder}/your_dag_folder/{obj}/query.sql'
    
            query = get_jinja_template(query_path).render(
                project_id='project',
                param2='param2'
            )
    
            execute_query = BigQueryInsertJobOperator(
                task_id='truncate_team_stat_staging_table',
                configuration={
                    "query": {
                        "query": query,
                        "useLegacySql": False,
                    }
                },
                location='EU'
            )
    
            query_operators.append(execute_query)
    
        chain(*query_operators)
    

    Each query file can contains multiple queries, example :

    SELECT ... FROM `{{project_id}}.dataset.table where element = '{{param2}}'....;
    SELECT ... FROM table2 where ....;
    
    • I used a structure with a list of objects on the queries folder
    • In the DAG, I configured this list of object
    • I loop on this list
    • For each object, I load the associated query from the DAG structure and queries folder : queries/object/query.sql
    • I render this query with Jinja2 template and params
    • I pass this query on the BigQueryInsertJobOperator
    • I create a list of BigQueryInsertJobOperator
    • Each query file can contains multiple SQL queries
    • The DAG will execute each BigQueryInsertJobOperator operator and queries by object sequentially via the chain operator

    I hope this solution will help.