Search code examples
airflow

Clearing xcom data with dag_id and execution_date


I am trying to clear xcom values based on the execution_date older than 30 days, but the dag is throwing error as execution date is not defined . Please help to understand the issue here (airflow 2.4.3)

code is follows

from airflow.models import DAG
from airflow.utils.db import provide_session
from airflow.models import XCom
from airflow.operators.python import PythonOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta,timezone
from sqlalchemy import func

with DAG(dag_id="cleanup_xcom_demo", schedule_interval=None, start_date=days_ago(2)) as dag:
    # cleanup_xcom
    @provide_session
    def cleanup_xcom(session=None, **context):
        dag = context["dag"]
        dag_id = dag._dag_id 
        # It will delete all xcom of the dag_id
        ts_limit = datetime.now(timezone.utc) - timedelta(days=30)
        print(f"printing session {session}")
        session.query(XCom).filter(Xcom.execution_date <= ts_limit ).delete()
   
    clean_xcom = PythonOperator(
        task_id="clean_xcom",
        python_callable = cleanup_xcom,
        provide_context=True, 
        # dag=dag
    )
    
    start  = DummyOperator(task_id="start")
    end = DummyOperator(task_id="end", trigger_rule="none_failed")
    
    start >> clean_xcom >> end


here is the error

Xcom is not defined 

when i remove xcom from xcom.execution_date and keep only execution_date in filter then it throws error as execution_date not defined 

Solution

  • I replaced the session.query as below and the error went away:

    session.query(XCom).filter(XCom.execution_date <= ts_limit).delete(synchronize_session='fetch')