I have the following code on Airflow 1.9:
import_op = MySqlToGoogleCloudStorageOperator(
task_id='import',
mysql_conn_id='oproduction',
google_cloud_storage_conn_id='gcpm',
provide_context=True,
approx_max_file_size_bytes = 100000000, #100MB per file
sql = 'import.sql',
params={'next_to_import': NEXT_TO_IMPORT, 'table_name' : TABLE_NAME},
bucket=GCS_BUCKET_ID,
filename=file_name_orders,
dag=dag)
Why does it genereates:
/usr/local/lib/python2.7/dist-packages/airflow/models.py:2160: PendingDeprecationWarning: Invalid arguments were passed to MySqlToGoogleCloudStorageOperator. Support for passing such arguments will be dropped in Airflow 2.0. Invalid arguments were: *args: () **kwargs: {'provide_context': True} category=PendingDeprecationWarning
What is the problem with the provide_context
? To the best of my knowledge it is needed for the usage of params
.
provide_context
is not needed for params
.
params
parameter (dict
type) can be passed to any Operator.
You would mostly use provide_context
with PythonOperator
, BranchPythonOperator
. A good example is https://airflow.readthedocs.io/en/latest/howto/operator.html#pythonoperator.
MySqlToGoogleCloudStorageOperator
has no parameter provide_context
, hence it is passed in **kwargs
and you get Deprecation warning.
If you check docstring of PythonOperator
for provide_context
:
if set to true, Airflow will pass a set of keyword arguments that can be used in your function. This set of kwargs correspond exactly to what you can use in your jinja templates. For this to work, you need to define
**kwargs
in your function header.
It has the following code if you check the source code:
if self.provide_context:
context.update(self.op_kwargs)
context['templates_dict'] = self.templates_dict
self.op_kwargs = context
So in simple terms, it passes the following dictionary with templates_dict
to your function pass in python_callable
:
{
'END_DATE': ds,
'conf': configuration,
'dag': task.dag,
'dag_run': dag_run,
'ds': ds,
'ds_nodash': ds_nodash,
'end_date': ds,
'execution_date': self.execution_date,
'latest_date': ds,
'macros': macros,
'params': params,
'run_id': run_id,
'tables': tables,
'task': task,
'task_instance': self,
'task_instance_key_str': ti_key_str,
'test_mode': self.test_mode,
'ti': self,
'tomorrow_ds': tomorrow_ds,
'tomorrow_ds_nodash': tomorrow_ds_nodash,
'ts': ts,
'ts_nodash': ts_nodash,
'yesterday_ds': yesterday_ds,
'yesterday_ds_nodash': yesterday_ds_nodash,
}
So this can be used in the function as follows:
def print_context(ds, **kwargs):
pprint(kwargs)
ti = context['task_instance']
exec_date = context['execution_date']
print(ds)
return 'Whatever you return gets printed in the logs'
run_this = PythonOperator(
task_id='print_the_context',
provide_context=True,
python_callable=print_context,
dag=dag,
)