Search code examples
airflowairflow-schedulergoogle-cloud-composerairflow-2.x

Reading XCOM and Airflow variables probably slows down Airflow (in Google Cloud Composer)


We are trying to merge daily (CSV) extract files into our Data Warehouse.

In our use case the DAG's python code is the same for all of our DAGs (~2000), so we generate them by a DAG generator logic from a single python file. In our DAGs we only have 15 tasks (5 dummy tasks, 2 CloudDataFusionStartPipelineOperator tasks, 8 python tasks).

During the DAG generation process we read Airflow Variables (~30-50) to determine what DAGs to generate (this also determines the IDs of the DAGs and the schema/table names they should handle). We call these Generator Variables.

During the DAG generation process the DAGs also read their configuration by their IDs (2-3 more Airflow Variables per generated DAG). We call these Configurator Variables.

Unfortunately in our DAGs we have to handle some passed arguments (via REST API) and lots of dynamically calculated information between the tasks so we rely on the XCOM functionality of the Airflow. This means tremendous number of reads in Airflow's DB.

Where is possible we use user defined macros to configure the tasks to delay the execution of the database reads (the executions of XCOM pulls) until the Task is executed, but it still puts a heavy load on Airflow (Google Cloud Composer). Approximately 50 pulls from XCOM.

Questions:

  • Is Airflow's Database designed for this high number of reads (of Airflow Variables and mainly values from XCOM)?
  • How should we redesign our code if there is a high number of dynamically calculated fields and metadata we have to pass between the tasks?
  • Should we simply accept the fact that there is a heavy load on DB in this type of use case and simply scale the DB up vertically?

XCOM pull example:

Metadata = PythonOperator(
    task_id         = TASK_NAME_PREFIX__METADATA + str(dag_id),
    python_callable = metadataManagment,
    op_kwargs       = {
        'dag_id'           : dag_id,
        'execution_case'   : '{{ ti.xcom_pull(task_ids="' + TASK_NAME_PREFIX__MANAGE_PARAMS + dag_id + '", key="execution_case_for_metadata") }}',
        'date'             : '{{ ti.xcom_pull(task_ids="' + TASK_NAME_PREFIX__MANAGE_PARAMS + dag_id + '", key="folder_date") }}',
        'enc_path'         : '{{ get_runtime_arg("RR", dag_run, "encryptedfilepath", ti.xcom_pull(task_ids="' + TASK_NAME_PREFIX__MANAGE_PARAMS + dag_id + '", key="folder_date")) }}',
        'dec_path'         : '{{ get_runtime_arg("RR", dag_run, "decryptedfilepath", ti.xcom_pull(task_ids="' + TASK_NAME_PREFIX__MANAGE_PARAMS + dag_id + '", key="folder_date")) }}',
        'aggr_project_name': ast.literal_eval(AIRFLOW_ENVIRONMENT_VARIABLES)['aggr_project_name'],
    },
    provide_context = True,
    trigger_rule    = TriggerRule.ALL_DONE
)

Example Generator Airlfow variables:

key: STD_SCHEMA_NAMES
val: [('SCHEMA1', 'MAIN'), ('SCHEMA2', 'MAIN'), ('SCHEMA2', 'SECONDARY')]

key: STD_MAIN_SCHEMA1_INSERT_APPEND_TABLES
val: ['SCHEMA1_table_1', 'SCHEMA1_table_2', 'SCHEMA1_table_3', ... ]

key: STD_MAIN_SCHEMA1_SCD2_TABLES
val: ['SCHEMA1_table_i', 'SCHEMA1_table_j', 'SCHEMA1_table_k', ... ]

key: STD_MAIN_SCHEMA2_SCD2_TABLES
val: ['SCHEMA2_table_l', 'SCHEMA2_table_m', 'SCHEMA2_table_n', ... ]

key: STD_SECONDARY_SCHEMA2_TRUNCATE_LOAD_TABLES
val: ['SCHEMA2_table_x', 'SCHEMA2_table_y', 'SCHEMA2_table_z', ... ]

DAG generator example:

# DAG_TYPE = STD
env_vars                                = Variable.get('environment_variables')

airflow_var_name__dag_typed_schema_name = '_'.join([x for x in [DAG_TYPE, 'SCHEMA_NAMES'] if x])
table_types                             = ['INSERT_APPEND', 'TRUNCATE_LOAD', 'SCD1', 'SCD2']

list_of_schemas_with_group              = ast.literal_eval(Variable.get(airflow_var_name__dag_typed_schema_name, '[]'))
tuples_of_var_names                     = [(x[0], x[1], y, '_'.join([z for z in [DAG_TYPE, x[1], x[0], y, 'TABLES'] if z])) for x in list_of_schemas_with_group for y in table_types]
list_of_tables                          = [(x[0], x[1], x[2], ast.literal_eval(Variable.get(x[3], 'None'))) for x in tuples_of_var_names]
list_of_tables                          = [(x[0], x[1], x[2], x[3]) for x in list_of_tables if x[3] and len(x[3]) > 0]


for schema_name, namespace_group, table_type, table_names_with_schema_prefix in list_of_tables:
    for table_name in table_names_with_schema_prefix:

        dag_id = str(table_name)
        globals()[dag_id] = create_dag( dag_id,
                                        schedule,
                                        default_dag_args,
                                        schema_name,
                                        table_type,
                                        env_vars,
                                        tags )

Solution

  • Is Airflow's Database designed for this high number of reads (of Airflow Variables and mainly values from XCOM)?

    Yes but the code you shared is abusive. You are using Variable.get() in top level code. This means that everytime the .py file is parsed Airflow execute a Variable.get() which open a session to the DB. Assuming you didn't change the defaults (min_file_process_interval) it means that every 30 seconds you execute a Variable.get() per each DAG.

    To put it into numbers you mentioned that you have 2000 DAGs each one makes ~30-50 Variable.get() calls this means that you have a range of 6000-10000 calls to the database every 30 seconds. This is very abusive.

    If you wish to use variables in top level code you should use environment variables and not Airflow variables. This is explained in Dynamic DAGs with environment variables doc.

    Noting that Airflow offers the option of defining a custom Secret Backend.

    How should we redesign our code if there is a high number of dynamically calculated fields and metadata we have to pass between the tasks?

    Airflow can handle high volumes. The issue is more with how you wrote the DAG.Should there are concerns about Xcom table or should you prefer to store it somewhere else Airflow support custom Xcom backend.

    Should we simply accept the fact that there is a heavy load on DB in this type of use case and simply scale the DB up vertically?

    From your description there are things you can do to improve the situation. Airflow is tested against high volumes of dags and tasks (vertical scale and horizontal scale). If you found evidence of performance issue you can report it with opening a Github Issue to the project.