Search code examples
pythonairflowairflow-scheduler

How to run same dag two times in a single run in Airflow


I am absolutely new to Airflow. I have one requirement where I have to run two EMR jobs. . Currently I have a python script which depends on some input files, if present it triggers a EMR job.

My new requirement is, I will be having to different input files(same type) and these two files will be input to the emr jobs, in both of this two cases the spark will do the same thing but only the input file are different.

create_job_workflow = EmrCreateJobFlowOperator(
    task_id='some-task',
    job_flow_overrides=job_flow_args,
    aws_conn_id=aws_conn,
    emr_conn_id=emr_conn,
    dag=dag
)

Ho can I achieve this to run two same dag run by only changing the input file inside spark-submit, basically whenever I will do 'trigger DAG' it will take two different input files and trigger two different emr jobs in two different emr cluster. Or can you any one please provide me some best practice to do it? Or any how is it possible by altering the max_active_runs=2


Solution

  • Another proper option, I believe, would be to use SubDAGs

    so you need to redefine your main processing DAG-flow as an SubDAG, and then create new 'general DAG' flow, which will just have two SubDagOperator for each run with different input filename as parameter. Please refer for details and example by the link above.