I'm trying to use AirFlow SqlSensor to get the updated/inserted new records in a postgresql database. There is a column in this table representing the timestamp of the last update.
I want to use SqlSensor in Airflow to fetch the newly updated / inserted records. However, I am stuck at the value of the timestamp that I should insert in the sql query of the sql sensor.
Here is my code:
with DAG(
dag_id="dag_process_supervisor",
start_date=datetime(2022, 12, 1),
catchup=False,
schedule_interval="@hourly"
) as dag:
wait_for_table_update = SqlSensor(
task_id='forecasting_jobs_sensor',
conn_id='postgres',
sql='''
SELECT *
FROM ForecastingJob
WHERE last_modified_at > ????;
''',
success=_success_criteria,
pass_value=True,
timeout=5 * 60, # The maximum amount of time in seconds that the sensor checks the condition.
poke_interval=60, # this is the time in seconds that the sensor waits before checking the condition again.
mode='reschedule' # if the criteria is not met then the sensor releases its worker slot and reschedule.
)
I am not really sure of the value to replace the ???
with and how is it going to be updated at each poll?