Search code examples
airflowairflow-scheduler

Airflow Dynamic Task Creation Stream Setting Not Working


I have a complex DAG that basically repeats it's stream six times for six different sources. So I've been using a for loop to dynamically create my streams like this (small example):

sources = ['source_1', 'source_2', 'source_3', 'source_4', 'source_5', 'source_6']

for source in sources:

    source_task_1 = PythonOperator(
        task_id=source + '_create_emr',
        dag=dag,
        provide_context=True,
        retries=10,
        python_callable=execute_arl_source_emr_creation,
        op_args=[source])

    source_task_2 = BashOperator(
        task_id=source + '_starting_step_1',
        retries=10,
        bash_command='echo "Starting step 1 for ' + source + '"',
        dag=dag)

    source_task_2.set_upstream(source_task_1)

All of the tasks are successfully being created in the DAG because I can see them all on the Airflow UI but the strange thing is it's only linking the tasks in the stream for the first occurrence in the loop (source_1).

All of the other tasks have no upstream or downstream on them. I don't understand how this is possible since the first occurrence in the loop worked shouldn't they all work?

Here is the actual code I have (it's a very large DAG so I'm only showing the tasks not the python callable functions I am using inside the tasks...):

def create_emr_step_3_subdag(main_dag, subdag_id, source):
    subdag = DAG('{0}.{1}'.format(main_dag.dag_id, subdag_id), default_args=args)

    source_run_emr_step_3 = PythonOperator(
        task_id=source.sourceFullName + '_run_emr_step_3',
        dag=subdag,
        provide_context=True,
        retries=0,
        python_callable=execute_emr_step_3,
        op_args=[source_create_emr_task_id, source.sourceFullName])

    source_run_emr_step_3_waiter = PythonOperator(
        task_id=source.sourceFullName + '_run_emr_step_3_waiter',
        dag=subdag,
        provide_context=True,
        retries=10,
        python_callable=execute_emr_step_3_waiter,
        op_args=[source_create_emr_task_id, source.sourceFullName])

    source_run_emr_step_3_waiter.set_upstream(source_run_emr_step_3)

    return subdag

class DatalakeDigitalPlatformArlWorkflowSource:
    sourceShortName = None  # source_1, source_2, source_3, source_4, source_5, source_6
    sourceFullName = None  # SOURCE_1, SOURCE_2, SOURCE_3, SOURCE_4, SOURCE_5, SOURCE_6

    def getSourceShortName(self):
        return self.sourceShortName

    def setSourceShortName(self, sourceShortName):
        self.sourceShortName = sourceShortName

    def getSourceFulltName(self):
        return self.sourceFullName

    def setSourceFullName(self, sourceFullName):
        self.sourceFullName = sourceFullName

source_1 = DatalakeDigitalPlatformArlWorkflowSource()
source_1.setSourceShortName("source_1")
source_1.setSourceFullName("SOURCE_1")

source_2 = DatalakeDigitalPlatformArlWorkflowSource()
source_2.setSourceShortName("source_2")
source_2.setSourceFullName("HZN")

source_3 = DatalakeDigitalPlatformArlWorkflowSource()
source_3.setSourceShortName("source_3")
source_3.setSourceFullName("SOURCE_3")

source_4 = DatalakeDigitalPlatformArlWorkflowSource()
source_4.setSourceShortName("source_4")
source_4.setSourceFullName("SOURCE_4")

source_5 = DatalakeDigitalPlatformArlWorkflowSource()
source_5.setSourceShortName("source_5")
source_5.setSourceFullName("PP")

source_6 = DatalakeDigitalPlatformArlWorkflowSource()
source_6.setSourceShortName("source_6")
source_6.setSourceFullName("SOURCE_6")

sources = [source_1, source_2, source_3, source_4, source_5, source_6]

for source in sources:
    source_create_emr_task_id = source.sourceFullName + '_create_emr'

    source_create_emr = PythonOperator(
        task_id=source_create_emr_task_id,
        dag=dag,
        provide_context=True,
        retries=10,
        python_callable=execute_blah_source_emr_creation,
        op_args=[source_create_emr_task_id, source.sourceFullName])

    # source_starting_step_1
    source_starting_step_1 = BashOperator(
        task_id=source.sourceFullName + '_starting_step_1',
        retries=10,
        bash_command='echo "Starting step 1 for ' + source.sourceShortName + '"',
        dag=dag)

    # Get source Batch ID
    source_get_batch_id = PythonOperator(
        task_id=source.sourceFullName + '_get_batch_id',
        retries=10,
        dag=dag,
        python_callable=get_batch_id,
        op_args=[airflow_home + '/resources/batch-id-inputs/batchid_input.json', source.sourceFullName])

    # source_licappts
    source_sensor_licappts = OmegaFileSensor(
        task_id=source.sourceFullName + '_sensor_licappts',
        retries=10,
        filepath=airflow_home + '/foo/data/bar/blah/test/data',
        filepattern=source.sourceShortName + '_licappts_(.*).txt',
        poke_interval=3,
        execution_timeout=timedelta(hours=23),
        dag=dag)
    source_process_licappts = PythonOperator(
        task_id=source.sourceFullName + '_process_licappts',
        retries=10,
        dag=dag,
        python_callable=execute_d_landing_import,
        op_args=[source.sourceShortName + '_licappts_(.*).txt', 'get' + source.sourceFullName + 'BatchId'])

    # source_agents
    source_sensor_agents = OmegaFileSensor(
        task_id=source.sourceFullName + '_sensor_agents',
        retries=10,
        filepath=airflow_home + '/foo/data/bar/blah/test/data',
        filepattern=source.sourceShortName + '_agents_(.*).txt',
        poke_interval=3,
        dag=dag)
    source_process_agents = PythonOperator(
        task_id=source.sourceFullName + '_process_agents',
        retries=10,
        dag=dag,
        python_callable=execute_d_landing_import,
        op_args=[source.sourceShortName + '_agents_*.txt', 'get' + source.sourceFullName + 'BatchId'])

    # source_agentpolicy
    source_sensor_agentpolicy = OmegaFileSensor(
        task_id=source.sourceFullName + '_sensor_agentpolicy',
        retries=10,
        filepath=airflow_home + '/foo/data/bar/blah/test/data',
        filepattern=source.sourceShortName + '_agentpolicy_(.*).txt',
        poke_interval=3,
        dag=dag)
    source_process_agentpolicy = PythonOperator(
        task_id=source.sourceFullName + '_process_agentpolicy',
        retries=10,
        dag=dag,
        python_callable=execute_d_landing_import,
        op_args=[source.sourceShortName + '_agentpolicy_*.txt', 'get' + source.sourceFullName + 'BatchId'])

    # source_finished_step_1
    source_finished_step_1 = BashOperator(
        task_id=source.sourceFullName + '_finished_step_1',
        retries=10,
        bash_command='echo "Finished step 1 for ' + source.sourceShortName + '"',
        dag=dag)

    # source_starting_step_2
    source_starting_step_2 = BashOperator(
        task_id=source.sourceFullName + '_source_starting_step_2',
        retries=10,
        bash_command='echo "Starting step 2 for ' + source.sourceShortName + '"',
        dag=dag)

    source_run_emr_step_2 = PythonOperator(
        task_id=source.sourceFullName + '_run_emr_step_2',
        dag=dag,
        provide_context=True,
        retries=0,
        python_callable=execute_emr_step_2,
        op_args=[source_create_emr_task_id, source.sourceFullName])

    source_run_emr_step_2_waiter = PythonOperator(
        task_id=source.sourceFullName + '_run_emr_step_2_waiter',
        dag=dag,
        provide_context=True,
        retries=10,
        python_callable=execute_emr_step_2_waiter,
        op_args=[source_create_emr_task_id, source.sourceFullName])

    # source_elastic_search_check
    source_elastic_search_check = PythonOperator(
        task_id=source.sourceFullName + '_elastic_search_check',
        retries=10,
        dag=dag,
        python_callable=execute_get_advisor_batch_stage_status,
        op_args=['get' + source.sourceFullName + 'BatchId', source.sourceFullName])

    # source_finished_step_2
    source_finished_step_2 = BashOperator(
        task_id=source.sourceFullName + '_finished_step_2',
        retries=10,
        bash_command='echo "Finished step 2 for ' + source.sourceShortName + '"',
        dag=dag)

    # source_starting_step_3
    source_starting_step_3 = BashOperator(
        task_id=source.sourceFullName + '_starting_step_3',
        retries=10,
        bash_command='echo "Starting step 3 for ' + source.sourceShortName + '"',
        dag=dag)

    source_emr_step_3_subdag_task_id = source.sourceFullName + '_emr_step_3_subdag'

    source_emr_step_3_subdag = SubDagOperator(
        task_id=source_emr_step_3_subdag_task_id,
        dag=dag,
        retries=10,
        pool='entitymatching_task_pool',
        subdag=create_emr_step_3_subdag(dag, source_emr_step_3_subdag_task_id, source)
    )

    # source_finished_step_3
    source_finished_step_3 = BashOperator(
        task_id=source.sourceFullName + '_finished_step_3',
        retries=10,
        bash_command='echo "Finished step 3 for ' + source.sourceShortName + '"',
        dag=dag)

    # source_starting_step_4
    source_starting_step_4 = BashOperator(
        task_id=source.sourceFullName + '_starting_step_4',
        retries=10,
        bash_command='echo "Starting step 4 for ' + source.sourceShortName + '"',
        dag=dag)

    source_run_emr_step_4 = PythonOperator(
        task_id=source.sourceFullName + '_run_emr_step_4',
        dag=dag,
        provide_context=True,
        retries=0,
        python_callable=execute_emr_step_4,
        op_args=[source_create_emr_task_id, source.sourceFullName])

    source_run_emr_step_4_waiter = PythonOperator(
        task_id=source.sourceFullName + '_run_emr_step_4_waiter',
        dag=dag,
        provide_context=True,
        retries=10,
        python_callable=execute_emr_step_4_waiter,
        op_args=[source_create_emr_task_id, source.sourceFullName])

    # source_finished_step_4
    source_finished_step_4 = BashOperator(
        task_id=source.sourceFullName + '_finished_step_4',
        retries=10,
        bash_command='echo "Finished step 4 for ' + source.sourceShortName + '"',
        dag=dag)

    source_emr_termination = PythonOperator(
        task_id=source.sourceFullName + '_emr_termination',
        dag=dag,
        provide_context=True,
        retries=10,
        retry_delay=timedelta(minutes=5),
        python_callable=execute_emr_termination,
        op_args=[source_create_emr_task_id, source.sourceFullName])

    # source_successful
    source_successful = BashOperator(
        task_id=source.sourceFullName + '_successful',
        retries=10,
        bash_command='sudo aws sns publish blah blah blah',
        dag=dag)

    # finished_foo_bar_blah_workflow
    finished_foo_bar_blah_workflow = BashOperator(
        task_id='finished_foo_bar_blah_workflow',
        bash_command='echo "Finished foo_bar_blah_workflow"',
        dag=dag)

    ### Stream ###

    # Create EMR Cluster
    source_create_emr.set_upstream(starting_foo_bar_blah_workflow)

    # Step 1
    source_starting_step_1.set_upstream(starting_foo_bar_blah_workflow)
    source_get_batch_id.set_upstream(source_starting_step_1)

    source_sensor_licappts.set_upstream(source_get_batch_id)
    source_process_licappts.set_upstream(source_sensor_licappts)

    source_sensor_agents.set_upstream(source_get_batch_id)
    source_process_agents.set_upstream(source_sensor_agents)

    source_sensor_agentpolicy.set_upstream(source_get_batch_id)
    source_process_agentpolicy.set_upstream(source_sensor_agentpolicy)

    source_finished_step_1.set_upstream(source_process_licappts)
    source_finished_step_1.set_upstream(source_process_agents)
    source_finished_step_1.set_upstream(source_process_agentpolicy)

    # Step 2
    source_starting_step_2.set_upstream(source_finished_step_1)
    source_starting_step_2.set_upstream(source_create_emr)  # Don't run EMR steps until the EMR is created
    source_run_emr_step_2.set_upstream(source_starting_step_2)
    source_run_emr_step_2_waiter.set_upstream(source_run_emr_step_2)
    source_elastic_search_check.set_upstream(source_run_emr_step_2_waiter)
    source_finished_step_2.set_upstream(source_elastic_search_check)

    # Step 3
    source_starting_step_3.set_upstream(source_finished_step_2)
    source_emr_step_3_subdag.set_upstream(source_starting_step_3)
    source_finished_step_3.set_upstream(source_emr_step_3_subdag)

    # Step 4
    source_starting_step_4.set_upstream(source_finished_step_3)
    source_run_emr_step_4.set_upstream(source_starting_step_4)
    source_run_emr_step_4_waiter.set_upstream(source_run_emr_step_4)
    source_finished_step_4.set_upstream(source_run_emr_step_4_waiter)

    # Terminate EMR Cluster
    source_emr_termination.set_upstream(source_finished_step_4)
    source_successful.set_upstream(source_emr_termination)
    finished_foo_bar_blah_workflow.set_upstream(source_successful)

And as you can see the streaming is not working

enter image description here

Before my recent modifications to the file it was working just fine and can be seen here

enter image description here

I just did a ton of refactoring on my code and when I reloaded it I saw this error. I'm not sure what I did but I did a lot of find + replace all to rename things and I'm wondering if I messed something up in that process and maybe I just don't see the error in the code. But what makes me think that isn't the issue is because if that were the issue then why would my first source be correctly linked in it's stream?

Is it possible I've hit some sort of limit on the amount of tasks I can have in a single DAG?


Solution

  • I think I found your mistake:

    First, to rule out that this is not an airflow bug, I created a small DAG that creates 25 tasks for 7 sources and sets the upstreams, and everything works fine.

    So I took your code, tried it and I experienced the exact same problem that you see.
    Now I went ahead, commented out all the steps where you set the upstreams and put them back in step by step, and everything works fine until one puts in the very last line:

    finished_foo_bar_blah_workflow.set_upstream(source_successful)
    

    So I looked at the task finished_foo_bar_blah_workflow and as far as I can see, this task only needs to get created once, not for every source. So I put the code

    # finished_foo_bar_blah_workflow
    finished_foo_bar_blah_workflow = BashOperator(
        task_id='finished_foo_bar_blah_workflow',
        bash_command='echo "Finished foo_bar_blah_workflow"',
        dag=dag)
    

    above the for source in sources: line, and voila, everything works fine.

    Edit
    So I looked into the upstream and downstream lists for the tasks that are supposed to be directly upstream of finished_foo_bar_blah_workflow and while finished_foo_bar_blah_workflow has only one task in the upstream_list (before I moved the code that creates that task, afterwards it correctly contains 7 tasks), all the task that are supposed to be directly upstream of that task, reference it in their downstream_list and and their upstream_list contains the tasks that should be there too. So this might be a bug concerning creating a task with the same task_id multiple times.