I am very new to Airflow and have read a large part of its documentation. From the documentations, I understand that small data between components in the DAG can be shared using XCom class. The component in the DAG which is publishing the data must push and the component which is subscribing to the data must pull.
However, I am not very clear on syntax part of pushing and pulling. I am referring to the section on XCom on the documentation and developed a code template. Suppose I have the following code which has only two components, a pusher and a puller. The pusher publishes the current time which the puller must consume and write to a log file.
from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
log_file_location = '/usr/local/airflow/logs/time_log.log'
default_args = {'owner':'apache'}
dag = DAG('pushpull', default_args = default_args)
def push_function():
#push this data on the DAG as key-value pair
return(datetime.now()) #current time
def pull_function():
with open(log_file_location, 'a') as logfile:
current_time = '' #pull data from the pusher as key - value pair
logfile.writelines('current time = '+current_time)
logfile.close()
with dag:
t1 = PythonOperator(
task_id = 'pusher',
python_callable = push_function)
t2 = PythonOperator(
task_id = 'puller',
python_callable = pull_function)
t2.set_upstream(t1)
I need help from Airflow masters here on two syntax:
Thanks!
Example to push to Xcom using a key:
def push_function(**context):
msg='the_message'
print("message to push: '%s'" % msg)
task_instance = context['task_instance']
task_instance.xcom_push(key="the_message", value=msg)
Example to pull to Xcom using a key:
def pull_function(**kwargs):
ti = kwargs['ti']
msg = ti.xcom_pull(task_ids='push_task',key='the_message')
print("received message: '%s'" % msg)
Example DAG:
from datetime import datetime, timedelta
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
DAG = DAG(
dag_id='simple_xcom',
start_date=datetime(2017, 10, 26),
schedule_interval=timedelta(1)
)
def push_function(**context):
msg='the_message'
print("message to push: '%s'" % msg)
task_instance = context['task_instance']
task_instance.xcom_push(key="the_message", value=msg)
push_task = PythonOperator(
task_id='push_task',
python_callable=push_function,
provide_context=True,
dag=DAG)
def pull_function(**kwargs):
ti = kwargs['ti']
msg = ti.xcom_pull(task_ids='push_task',key='the_message')
print("received message: '%s'" % msg)
pull_task = PythonOperator(
task_id='pull_task',
python_callable=pull_function,
provide_context=True,
dag=DAG)
push_task >> pull_task