Search code examples
airflowdirected-acyclic-graphsairflow-2.x

In Apache Airflow, if I define two branch tasks leading to the same another task, only the second one works


I have the following tasks:

...
    check_files_task = task.branch(task_id="check_files")(check_files)([data["xls_file_path"], data["xls_feature_columns_file"], data["xls_fields_columns_file"]], "check_translations", "no_files")
    check_pending_translation = task.branch(task_id="check_translations")(check_files)([data["pending_feature_translations_file_path"]], "translations_pending", "xls_to_features")
    check_pending_translation2 = task.branch(task_id="check_translations2")(check_files)([data["pending_feature_translations_file_path"]], "translations_pending", "xls_to_products")

    no_files_task = EmptyOperator(task_id="no_files")

    translations_pending_task = EmailOperator(
        task_id="translations_pending", 
        to=emails, 
        subject="Vsehub translations pending", 
        html_content=translation_instructions, 
        files=[data['pending_feature_translations_file_path']])
...

The last two arguments of check_files function are ids of branches. Both check_pending_translation and check_pending_translation2 tasks return "translations_pending"

skipmixin_key   {'followed': ['translations_pending']}
return_value    translations_pending

I connect the tasks in the following way:

    check_files_task >> [no_files_task, check_pending_translation]
    check_pending_translation >> [translations_pending_task, xls_to_features_task]
    xls_to_features_task >> translate_features_task >> check_pending_translation2
    check_pending_translation2 >> [translations_pending_task, xls_to_products_task]
    xls_to_products_task >> translate_products_task

But the issue is: Good: if I allow execution to reach check_pending_translation2 the translations_pending_task is being executed. Bad: but if the check_pending_translation returns "translations_pending" the translations_pending_task is being skipped and no branch is executed.

enter image description here

Expected: If check_pending_translation returns translations_pending the translations_pending_task must execute. Result: all branches are being skipped


Solution

  • The default trigger rule is all_success. Given that translations_pending depends on a task that was skipped, I believe it was skipped in turn.

    I suspect that you just need to set the trigger_rule parameter for the translations_pending task to be one_success.