Search code examples
pythonapache-nifi

Get last execution timestamp of a Nifi PutSQL processor


Is there a way to get the timestamp of the last execution of a PutSQL processor via the REST-API? Does such a timestamp even exist or can I build one myself somehow?

Setup: I have Airflow to trigger my Nifi-ETL which ends with a couple of PutSQL processors - after those are done I need to execute something else in Airflow.

Idea: I want to trigger the first Nifi processor and then wait in Airflow until the last_execution_timestamp of the last PutSQL processor is updated.

Problem: I tried accessing the attribute statsLastRefreshed, but it is not the last execution time, but the last time anything (users / api-requests) accessed the processor which led Nifi to refresh the processor.

s = processor["status"]["statsLastRefreshed"]  # '13:13:26 CEST'

I can't find anything in the REST API documentation of Airflow.

The only other option I see is to make requests from Airflow to the datebase table of the last PutSQL processor to see if anything new happened there.


Solution

  • I figured out a work-around solution.

    1. In a processor add a custom property named mypropertyname with the value ${now()}

    2. Any flowfile that passes through the processor will have the timestamp of when it passed through the processor as an attribute!

    3. Have a UpdateAttribute processor after the processor from step 1 with the option (under processor properties) Store State set to Store state locally.

    4. Add a custom property in the UpdateAttribute processor with the name readable_property and set it to the value ${'mypropertyname'}.

    The state of the processor now contains the value of the last flowfile (e.g. with a timestamp of the execution of the now() method from step 1).

    1. Get the value of the stateful processor (and hence the value of the last flowfile that passed through (!) ) via the REST-API and a GET on the URI /nifi-api/processors/{id}/state (e.g. in Airflow)

    The JSON which gets returned contains the following lines:

    {
    "key":"readable_property"
    ,"value":"Wed Apr 14 11:13:40 CEST 2021"
    ,"clusterNodeId":"some-id-0d8eb6052"
    ,"clusterNodeAddress":"some-host:port-number"
    }
    

    Then you just have to parse the JSON for the value in Airflow.

    Note: There will be a slight delay between the previous processor adding the attribute to the flow-file with now and when the flowfile actually passes through the UpdateAttributeprocessor from where you can read the timestamp.