Search code examples
bigdataairflowairflow-taskflow

Airflow dynamically genarated task not run in order


I have created dynamic tasks generation dag. Tasks are generated accurately, But those tasks are not trigger in order,not work in consistently. i have noticed it triggered on alphanumeric order. Let's check run_modification_ tasks. i have generated 0 to 29 tasks. i have noticed it trigger on below format. run_modification_0 run_modification_1 run_modification_10 run_modification_11 run_modification_12 run_modification_13 run_modification_14 run_modification_15 run_modification_16 run_modification_17 run_modification_18 run_modification_19 run_modification_2 run_modification_21 run_modification_23....

But i need to run it on tasks order like
run_modification_0 run_modification_1 run_modification_2 run_modification_3 run_modification_4 run_modification_5..

Please help me to run those tasks on task created order.

Please check related dag running image

from datetime import date, timedelta, datetime
from airflow.utils.dates import days_ago
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.postgres_operator import PostgresOperator
from airflow.hooks.postgres_hook import PostgresHook
from airflow.models import Variable
import os

args = {
        'owner': 'Airflow',
        'start_date': days_ago(2),
}

dag = DAG(
        dag_id='tastOrder',
        default_args=args,
        schedule_interval=None,
        tags=['task']
)

modification_processXcom = """ cd {{ ti.xcom_pull(task_ids=\'run_modification_\'+params.i, key=\'taskDateFolder\') }}  """


def modificationProcess(ds,**kwargs):
    today  = datetime.strptime('2021-01-01', '%Y-%m-%d').date()
    i = str(kwargs['i'])
    newDate = today-timedelta(days=int(i))
    print(str(newDate))
    kwargs["ti"].xcom_push("taskDateFolder", str(newDate))



def getDays():
    today = today = datetime.strptime('2021-01-01', '%Y-%m-%d').date()
    yesterday = today - timedelta(days=30)
    day_Diff = today-yesterday
    return day_Diff,today

day_Diff, today = getDays()
for i in reversed(range(0,day_Diff.days)):
    run_modification = PythonOperator(
        task_id='run_modification_'+str(i),
        provide_context=True,
        python_callable=modificationProcess,
        op_kwargs={'i': str(i)},
        dag=dag,
    )

    modification_processXcom = BashOperator(
        task_id='modification_processXcom_'+str(i),
        bash_command=modification_processXcom,
        params = {'i' :str(i)},
        dag = dag
    )

    run_modification >> modification_processXcom

Solution

  • To get the dependency as:

    run_modification_1 -> modification_processXcom_1 -> 
    run_modification_2 -> modification_processXcom_2 -> ... - > 
    run_modification_29 -> modification_processXcom_29
    

    You can do:

    from datetime import datetime
    from airflow import DAG
    from airflow.operators.bash import BashOperator
    
    
    dag = DAG(
        dag_id='my_dag',
        schedule_interval=None,
        start_date=datetime(2021, 8, 10),
        catchup=False,
        is_paused_upon_creation=False,
    )
    
    mylist1 = []
    mylist2 = []
    for i in range(1, 30):
        mylist1.append(
            BashOperator( # Replace with your requested operator
                task_id=f'run_modification_{i}',
                bash_command=f"""echo executing run_modification_{i}""",
                dag=dag,
            )
        )
        mylist2.append(
            BashOperator( # Replace with your requested operator
                task_id=f'modification_processXcom_{i}',
                bash_command=f"""echo executing modification_processXcom_{i}""",
                dag=dag,
            )
        )
    if len(mylist1) > 0:
        mylist1[-1] >> mylist2[-1] # This set dependency between run_modifiation to modification_processXcom
    if len(mylist1) > 1:
        mylist2[-2] >> mylist1[-1] # This set dependency between modification_processXcom to previous run_modifiation
    

    This code create a list of operators and set them to run one after another as: enter image description here

    Tree view: enter image description here