I am writting a Dag and I encountered a problem when I try to extract the Xcom value from the task.
I would like to achieve something like this
func
that gets the value of the manually triggered dag_run parameter in {{ dag_run.conf }}
{"dlf_40_start_at":"2022-01-01", "dlf_40_end_at":"2022-01-02"}, pushes the value to Xcom
def func(**kwargs):
dag_run_conf = kwargs["dag_run"].conf
task_instance: TaskInstance = kwargs.get("task_instance") or kwargs.get("ti")
task_instance.xcom_push(key='current_load_window_start', value=dag_run_conf['dlf_40_start_at'])
task_instance.xcom_push(key='current_load_window_end', value=dag_run_conf['dlf_40_end_at'])
PythonOperator
, called the function func
to push the value to the Xcom. extract_manual_config_parameter = PythonOperator(
task_id='extract_manual_config_parameter',
python_callable=func,
dag=dag_capture_min_id,
provide_context=True # Remove this if you are using Airflow >=2.0
)
user_defined_macros
asdef get_return_value(a, b):
print("dlf_40_start_at", a, "dlf_40_end_at", b)
return [a, b]
jinja
template and get the value out of it.Here is my full code
from datetime import timedelta, datetime
from airflow import DAG
from airflow.api.client.local_client import Client
from airflow.models import Variable, TaskInstance, XCom
from airflow.models.dagrun import DagRun
from airflow.operators.python_operator import PythonOperator
from common.dbt_cloud_plugin.operators.domain_dbt_cloud_job_operator import DomainDbtCloudJobOperator
from common.utils.slack_utils import SlackUtils
from data_product_insights.scv_main_pipeline.utils import snowflake_operator, stage_config, \
create_common_tasks_for_matching_closure_dag
from data_product_insights.scv_main_pipeline.config import DbtStageConfig
stg = Variable.get('stage')
dbt_config = {
'dev': DbtStageConfig(dbt_cloud_job_name='scv-ga-data-extraction-dev'),
'stg': DbtStageConfig(dbt_cloud_job_name='scv-ga-data-extraction-stg'),
'prd': DbtStageConfig(dbt_cloud_job_name='scv-ga-data-extraction-prd')
}
dbt_stage_config = dbt_config[stg]
def close_process_and_trigger_downstream(**kwargs: dict) -> None: # pylint: disable=unused-argument
dag_run: DagRun = kwargs.get("dag_run")
loop_index = dag_run.conf.get("loop_index", 1)
process_id = dag_run.conf.get("process_id")
sql = f"UPDATE SCV_STAGING.ETL_PROCESS SET ETL_END_LOAD_DATE = CURRENT_TIMESTAMP(), " \
f"IS_NEXT_ETL = FALSE, ETL_STATUS = 'done', MAX_SESSIONID = {loop_index} " \
f"WHERE PROCESS_ID = {process_id} AND TYPE = 'matching'"
snowflake_operator(
task_id="close_matching_process_record",
sql=sql
).execute(kwargs)
client = Client(None, None)
client.trigger_dag(stage_config.dag_id_scv_search_event_pipeline,
run_id=datetime.now().strftime("%d_%m_%Y_%H_%M_%S"),
conf={"process_id": process_id})
def func(**kwargs):
dag_run_conf = kwargs["dag_run"].conf
task_instance: TaskInstance = kwargs.get("task_instance") or kwargs.get("ti")
task_instance.xcom_push(key='current_load_window_start', value=dag_run_conf['dlf_40_start_at'])
task_instance.xcom_push(key='current_load_window_end', value=dag_run_conf['dlf_40_end_at'])
def get_return_value(a, b):
print("dlf_40_start_at", a, "dlf_40_end_at", b)
return [a, b]
with DAG(dag_id=stage_config.dag_id_scv_matching_process_closure_dag,
description='Matching engine of scv data',
schedule_interval=None,
default_args={
'start_date': stage_config.start_date,
'retries': stage_config.retries,
'retry_delay': timedelta(minutes=1),
'on_failure_callback': SlackUtils(stage_config.slack_connection_id).post_slack_failure,
'retry_exponential_backoff': stage_config.retry_back_off,
},
user_defined_macros={"get_return_value": get_return_value},
params=stage_config.as_dict()) as dag_capture_min_id:
extract_manual_config_parameter = PythonOperator(
task_id='extract_manual_config_parameter',
python_callable=func,
dag=dag_capture_min_id,
provide_context=True # Remove this if you are using Airflow >=2.0
)
return_value = "{{ get_return_value( task_instance.xcom_pull(task_ids='extract_manual_config_parameter', " \
"key='current_load_window_start'), task_instance.xcom_pull(" \
"task_ids='extract_manual_config_parameter', key='current_load_window_end')) }} "
close_matching_process_and_trigger_downstream_dags = PythonOperator(
task_id="close_matching_process_and_trigger_downstream_dags",
python_callable=close_process_and_trigger_downstream,
provide_context=True
)
dbt_40_dlf = DomainDbtCloudJobOperator(
task_id='dbt_40_dlf',
xcom_task_id='extract_manual_config_parameter',
dbt_cloud_conn_id=dbt_stage_config.dbt_cloud_connection,
job_name=dbt_stage_config.dbt_cloud_job_name,
data={
"cause": "Kicked off from Airflow",
"git_branch": dbt_stage_config.dbt_job_dev_version,
"steps_override": ['dbt test'],
"xcom_value": return_value,
"haha": "haha"
},
dag=dag_capture_min_id
).build_operators()
create_common_tasks_for_matching_closure_dag(
dag=dag_capture_min_id,
downstream_task=extract_manual_config_parameter
)
extract_manual_config_parameter >> dbt_40_dlf.first
dbt_40_dlf.last >> close_matching_process_and_trigger_downstream_dags
The log result suggests that the jinja template is not evaluated.
INFO - The data is {'cause': 'Kicked off via Airflow: scv_matching_process_closure_dag', 'git_branch': 'development', 'steps_override': ['dbt test'], 'xcom_value': "{{ get_return_value(task_instance.xcom_pull(task_ids='extract_manual_config_parameter', key='current_load_window_start'), task_instance.xcom_pull(task_ids='extract_manual_config_parameter', key='current_load_window_end')) }} ", 'haha': 'haha'}
Am I doing anything wrong here?
XCOMs work automatically with a return value from a task, so instead of using the DomainDbtCloudJobOperator
, you can just return your return_value
, and it'll be saved as an XCOM with the task name of where you currently are.
Then in your next task, you just need to do something like:
my_xcom = kwargs['task_instance'].xcom_pull(
task_ids='my_previous_task_name')
I think your method would work with some changes, but it's very over-engineered.