Is there a way to get the timestamp of the last execution of a PutSQL
processor via the REST-API? Does such a timestamp even exist or can I build one myself somehow?
Setup: I have Airflow to trigger my Nifi-ETL which ends with a couple of PutSQL processors - after those are done I need to execute something else in Airflow.
Idea: I want to trigger the first Nifi processor and then wait in Airflow until the last_execution_timestamp
of the last PutSQL processor is updated.
Problem:
I tried accessing the attribute statsLastRefreshed
, but it is not the last execution time, but the last time anything (users / api-requests) accessed the processor which led Nifi to refresh the processor.
s = processor["status"]["statsLastRefreshed"] # '13:13:26 CEST'
I can't find anything in the REST API documentation of Airflow.
The only other option I see is to make requests from Airflow to the datebase table of the last PutSQL processor to see if anything new happened there.
I figured out a work-around solution.
In a processor add a custom property named mypropertyname with the value ${now()}
Any flowfile that passes through the processor will have the timestamp of when it passed through the processor as an attribute!
Have a UpdateAttribute
processor after the processor from step 1 with the option (under processor properties) Store State set to Store state locally
.
Add a custom property in the UpdateAttribute
processor with the name readable_property and set it to the value ${'mypropertyname'}
.
The state of the processor now contains the value of the last flowfile (e.g. with a timestamp of the execution of the now()
method from step 1).
/nifi-api/processors/{id}/state
(e.g. in Airflow)The JSON which gets returned contains the following lines:
{
"key":"readable_property"
,"value":"Wed Apr 14 11:13:40 CEST 2021"
,"clusterNodeId":"some-id-0d8eb6052"
,"clusterNodeAddress":"some-host:port-number"
}
Then you just have to parse the JSON for the value in Airflow.
Note: There will be a slight delay between the previous processor adding the attribute to the flow-file with now
and when the flowfile actually passes through the UpdateAttribute
processor from where you can read the timestamp.