Search code examples
airflowairflow-api

Get delta of updated records using sqlSensor in AirFlow based on last updated time


I'm trying to use AirFlow SqlSensor to get the updated/inserted new records in a postgresql database. There is a column in this table representing the timestamp of the last update.

I want to use SqlSensor in Airflow to fetch the newly updated / inserted records. However, I am stuck at the value of the timestamp that I should insert in the sql query of the sql sensor.

Here is my code:

with DAG(
        dag_id="dag_process_supervisor",
        start_date=datetime(2022, 12, 1),
        catchup=False,
        schedule_interval="@hourly"
) as dag:
    wait_for_table_update = SqlSensor(
        task_id='forecasting_jobs_sensor',
        conn_id='postgres',
        sql='''
          SELECT *
          FROM ForecastingJob 
          WHERE last_modified_at > ????;
       ''',
        success=_success_criteria,
        pass_value=True,
        timeout=5 * 60,  # The maximum amount of time in seconds that the sensor checks the condition.
        poke_interval=60,  # this is the time in seconds that the sensor waits before checking the condition again.
        mode='reschedule'  # if the criteria is not met then the sensor releases its worker slot and  reschedule.
    )

I am not really sure of the value to replace the ??? with and how is it going to be updated at each poll?


Solution

    1. add a column(flag) with timestamp which track the last updated time in the destination (it could be datawarehouse or a file that keep record of the updates)
    2. write a task before the "SqlSensor" one and retrieve the last_update_time and push it into the Xcoms
    3. use the Xcoms to pull the last_update_time and replace it with "????"