Search code examples
airflowairflow-scheduler

how to run same tasks in parallel in airflow?


I want to run same tasks in parallel for different sources viz [A,B,C,D]. Following tasks are same for all the sources and currently I have repeated same code 4 times and then run them parallel, this set up is just increasing the line of code. I wanted a better way for running same task(same logic for all sources) in parallel and reduce my lines of code. may be a modular approach.

        delete_A_table = BigQueryDeleteTableOperator(
            task_id="delete_A_table",
            deletion_dataset_table=projectId+"."+dataSetId+".daily_A",
            ignore_if_missing=True
        )

        create_A_ext_table = BigQueryCreateExternalTableOperator(
            task_id="create_A_ext_table",
            table_resource={
                "tableReference": {
                    "projectId": 'projectId1',
                    "datasetId": 'datasetId1',
                    "tableId": "daily_A",
                },
                "externalDataConfiguration": {
                    "sourceUris": "gs://loc/source_data_A"+bucket_path,
                    "sourceFormat": "AVRO",
                    "compression": "NONE",
                    "skipLeadingRows": 1,
                },
            },
        )

I run the following to achieve parallelism:

delete_C_table >> create_C_ext_table >> [trigger_C, trigger_daily_vge_report]
delete_B_table >> create_B_ext_table
delete_D_table >> create_D_ext_table

TIA.

==UPDATE HERE==================NEW CODE==========================

This is the actual code just replacing few tables names.

import os
from datetime import datetime, timedelta
from airflow import models
from airflow import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryCreateExternalTableOperator, \
    BigQueryDeleteTableOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.operators.python_operator import PythonOperator

extract_day = int(models.Variable.get('day'))
ext_table_names = models.Variable.get('ext_table_names')

extract_date = (datetime.now() - timedelta(days=extract_day)).strftime('%Y-%m-%d')

dte = datetime.now() - timedelta(days=extract_day)
year = dte.year
mth = dte.month
day = dte.day

if day < 10:
    day = "0" + str(day)

if mth < 10:
    mth = "0" + str(mth)

bucket_path = "/year=" + str(year) + "/month=" + str(mth) + "/day=" + str(day) + "/*"

projectId = "projectId1"
dataSetId = "datasetid"

default_dag_args = {
    'start_date': datetime(2022, 3, 4),
    'email': ['[email protected]'],
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': 1,
    'retry_delay': timedelta(minutes=1)
}

with models.DAG(
        'GCS_to_BQ_ExternalTableForLoop',
        default_args=default_dag_args,
        catchup=False,
        schedule_interval='0 2 * * *'
        # schedule_interval=None
) as dag:
    
    for table_name in ('A', 'B','C','D'):
        delete_table_task = BigQueryDeleteTableOperator(
            task_id=f"delete_{table_name}_table",
            deletion_dataset_table=f"{projectId}.{dataSetId}.daily_{table_name}",
            ignore_if_missing=True
        )
    
        create_table_ext_task = BigQueryCreateExternalTableOperator(
            task_id=f"create_{table_name}_ext_table",
            table_resource={
                "tableReference": {
                    "projectId": projectId,
                    "datasetId": dataSetId,
                    "tableId": f"daily_{table_name}",
                },
                "externalDataConfiguration": {
                    "sourceUris": f"gs://loc-{table_name}/data_{table_name}{bucket_path}",
                    "sourceFormat": "AVRO",
                    "compression": "NONE",
                    "skipLeadingRows": 1,
                },
            },
        )
        
    trigger_dag_a = TriggerDagRunOperator(
         task_id='trigger_DAG_A',
         trigger_dag_id='DAG-A',
         execution_date='{{ ds }}',
         reset_dag_run=True
    )
    
    trigger_dag_b = TriggerDagRunOperator(
         task_id='trigger_DAG_B',
         trigger_dag_id='DAG-B',
         execution_date='{{ ds }}',
         reset_dag_run=True
    )

Now if I gave the dependencies as follows :

delete_table_task >> create_table_ext_task >> [trigger_dag_a, trigger_dag_b]

gives me such graph(NOT what I need) enter image description here

What I want is as follows :

    delete_A_table >> create_A_ext_table >> [trigger_DAG_C, trigger_DAG_D, Different_DAG]
    delete_C_table >> create_C_ext_table >> [trigger_DAG_C, trigger_DAG_D, Different_DAG]
    delete_B_table >> create_B_ext_table >> [Different_DAG]
    delete_D_table >> create_D_ext_table >> [Different_DAG]

This is what I need. enter image description here


Solution

  • If I understand correctly, you have a fixed number of tables, and you want to run the same flow per table in parallel without code duplication; something like that can work well for you:

    for table_name in ["A", "B"]:
        delete_table_task = BigQueryDeleteTableOperator(
            task_id=f"delete_{table_name}_table",
            deletion_dataset_table=f"{projectId}.{dataSetId}.daily_{table_name}",
            ignore_if_missing=True
        )
    
        create_ext_table_task = BigQueryCreateExternalTableOperator(
            task_id=f"create_{table_name}_ext_table",
            table_resource={
                "tableReference": {
                    "projectId": 'projectId1',
                    "datasetId": 'datasetId1',
                    "tableId": f"daily_{table_name}",
                },
                "externalDataConfiguration": {
                    "sourceUris": f"gs://loc/source_data_{table_name}{bucket_path}",
                    "sourceFormat": "AVRO",
                    "compression": "NONE",
                    "skipLeadingRows": 1,
                },
            },
        )
    
        delete_table_task >> create_ext_table_task
    

    In the case of table C, where you have an extra flow, you can always do some conditions inside the loop to add the flow when needed.

    EDIT based on the comments: If you want to make all the tasks in the loop to be downstream/upstream to other tasks, you have two options:

    1. use a TaskGroup; see the following example.
    2. You can create the downstream task before the loop and write delete_table_task >> create_ext_table_task >> downstream_task in the loop.