Search code examples
apache-nifi

How can I timestamp messages in nifi?


Disclaimer: I know absolutely nothing about nifi.

I need to receive messages from the ListenHTTP processor, and then convert each message into a timestamped json message.

So, say I receive the message hello world at 5 am. It should transform it into {"timestamp": "5 am", "message":"hello world"}.

How do I do that?


Solution

  • Each flowfile has attributes, which are pieces of metadata stored in key/value pairs in memory (available for rapid read/write). When any operation occurs, pieces of metadata get written by the NiFi framework, both to the provenance events related to the flowfile, and sometimes to the flowfile itself. For example, if ListenHTTP is the first processor in the flow, any flowfile that enters the flow will have an attribute entryDate with the value of the time it originated in the format Thu Jan 24 15:53:52 PST 2019. You can read and write these attributes with a variety of processors (i.e. UpdateAttribute, RouteOnAttribute, etc.).

    For your use case, you could a ReplaceText processor immediately following the ListenHTTP processor with a search value of (?s)(^.*$) (the entire flowfile content, or "what you received via the HTTP call") and a replacement value of {"timestamp_now":"${now():format('YYYY-MM-dd HH:mm:ss.SSS Z')}", "timestamp_ed": "${entryDate:format('YYYY-MM-dd HH:mm:ss.SSS Z')}", "message":"$1"}.

    The example above provides two options:

    1. The entryDate is when the flowfile came into existence via the ListenHTTP processor
    2. The now() function gets the current timestamp in milliseconds since the epoch

    Those two values can differ slightly based on performance/queuing/etc. In my simple example, they were 2 milliseconds apart. You can format them using the format() method and the normal Java time format syntax, so you could get "5 am" for example by using h a (full example: now():format('h a'):toLower()).

    Example

    • ListenHTTP running on port 9999 with path contentListener
    • ReplaceText as above
    • LogAttribute with log payload true

    NiFi flow on canvas and terminal showing log and curl command

    Curl command: curl -d "helloworld" -X POST http://localhost:9999/contentListener

    Example output:

    2019-01-24 16:04:44,529 INFO [Timer-Driven Process Thread-6] o.a.n.processors.standard.LogAttribute LogAttribute[id=8246b0a0-0168-1000-7254-2c2e43d136a7] logging for flow file StandardFlowFileRecord[uuid=5e1c6d12-298d-4d9c-9fcb-108c208580fa,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1548374015429-1, container=default, section=1], offset=3424, length=122],offset=0,name=5e1c6d12-298d-4d9c-9fcb-108c208580fa,size=122]
    --------------------------------------------------
    Standard FlowFile Attributes
    Key: 'entryDate'
        Value: 'Thu Jan 24 16:04:44 PST 2019'
    Key: 'lineageStartDate'
        Value: 'Thu Jan 24 16:04:44 PST 2019'
    Key: 'fileSize'
        Value: '122'
    FlowFile Attribute Map Content
    Key: 'filename'
        Value: '5e1c6d12-298d-4d9c-9fcb-108c208580fa'
    Key: 'path'
        Value: './'
    Key: 'restlistener.remote.source.host'
        Value: '127.0.0.1'
    Key: 'restlistener.remote.user.dn'
        Value: 'none'
    Key: 'restlistener.request.uri'
        Value: '/contentListener'
    Key: 'uuid'
        Value: '5e1c6d12-298d-4d9c-9fcb-108c208580fa'
    --------------------------------------------------
    {"timestamp_now":"2019-01-24 16:04:44.518 -0800", "timestamp_ed": "2019-01-24 16:04:44.516 -0800", "message":"helloworld"}