Search code examples
airflow

Airflow deprecation warning Invalid arguments were passed


I have the following code on Airflow 1.9:

import_op = MySqlToGoogleCloudStorageOperator(
    task_id='import',
    mysql_conn_id='oproduction',
    google_cloud_storage_conn_id='gcpm',
    provide_context=True,
    approx_max_file_size_bytes = 100000000, #100MB per file
    sql = 'import.sql',
    params={'next_to_import': NEXT_TO_IMPORT, 'table_name' : TABLE_NAME},
    bucket=GCS_BUCKET_ID,
    filename=file_name_orders,
    dag=dag)

Why does it genereates:

/usr/local/lib/python2.7/dist-packages/airflow/models.py:2160: PendingDeprecationWarning: Invalid arguments were passed to MySqlToGoogleCloudStorageOperator. Support for passing such arguments will be dropped in Airflow 2.0. Invalid arguments were: *args: () **kwargs: {'provide_context': True} category=PendingDeprecationWarning

What is the problem with the provide_context? To the best of my knowledge it is needed for the usage of params.


Solution

  • provide_context is not needed for params.

    params parameter (dict type) can be passed to any Operator.

    You would mostly use provide_context with PythonOperator, BranchPythonOperator. A good example is https://airflow.readthedocs.io/en/latest/howto/operator.html#pythonoperator.

    MySqlToGoogleCloudStorageOperator has no parameter provide_context, hence it is passed in **kwargs and you get Deprecation warning.

    If you check docstring of PythonOperator for provide_context :

    if set to true, Airflow will pass a set of keyword arguments that can be used in your function. This set of kwargs correspond exactly to what you can use in your jinja templates. For this to work, you need to define **kwargs in your function header.

    It has the following code if you check the source code:

    if self.provide_context:
                context.update(self.op_kwargs)
                context['templates_dict'] = self.templates_dict
                self.op_kwargs = context
    

    So in simple terms, it passes the following dictionary with templates_dict to your function pass in python_callable:

    {
        'END_DATE': ds,
        'conf': configuration,
        'dag': task.dag,
        'dag_run': dag_run,
        'ds': ds,
        'ds_nodash': ds_nodash,
        'end_date': ds,
        'execution_date': self.execution_date,
        'latest_date': ds,
        'macros': macros,
        'params': params,
        'run_id': run_id,
        'tables': tables,
        'task': task,
        'task_instance': self,
        'task_instance_key_str': ti_key_str,
        'test_mode': self.test_mode,
        'ti': self,
        'tomorrow_ds': tomorrow_ds,
        'tomorrow_ds_nodash': tomorrow_ds_nodash,
        'ts': ts,
        'ts_nodash': ts_nodash,
        'yesterday_ds': yesterday_ds,
        'yesterday_ds_nodash': yesterday_ds_nodash,
    }
    

    So this can be used in the function as follows:

    def print_context(ds, **kwargs):
        pprint(kwargs)
        ti = context['task_instance']
        exec_date = context['execution_date']
        print(ds)
        return 'Whatever you return gets printed in the logs'
    
    
    run_this = PythonOperator(
        task_id='print_the_context',
        provide_context=True,
        python_callable=print_context,
        dag=dag,
    )