Disclaimer: I know absolutely nothing about nifi.
I need to receive messages from the ListenHTTP
processor, and then convert each message into a timestamped json message.
So, say I receive the message hello world
at 5 am. It should transform it into {"timestamp": "5 am", "message":"hello world"}
.
How do I do that?
Each flowfile has attributes, which are pieces of metadata stored in key/value pairs in memory (available for rapid read/write). When any operation occurs, pieces of metadata get written by the NiFi framework, both to the provenance events related to the flowfile, and sometimes to the flowfile itself. For example, if ListenHTTP
is the first processor in the flow, any flowfile that enters the flow will have an attribute entryDate
with the value of the time it originated in the format Thu Jan 24 15:53:52 PST 2019
. You can read and write these attributes with a variety of processors (i.e. UpdateAttribute
, RouteOnAttribute
, etc.).
For your use case, you could a ReplaceText
processor immediately following the ListenHTTP
processor with a search value of (?s)(^.*$)
(the entire flowfile content, or "what you received via the HTTP call") and a replacement value of {"timestamp_now":"${now():format('YYYY-MM-dd HH:mm:ss.SSS Z')}", "timestamp_ed": "${entryDate:format('YYYY-MM-dd HH:mm:ss.SSS Z')}", "message":"$1"}
.
The example above provides two options:
entryDate
is when the flowfile came into existence via the ListenHTTP
processornow()
function gets the current timestamp in milliseconds since the epochThose two values can differ slightly based on performance/queuing/etc. In my simple example, they were 2 milliseconds apart. You can format them using the format()
method and the normal Java time format syntax, so you could get "5 am" for example by using h a
(full example: now():format('h a'):toLower()
).
Example
ListenHTTP
running on port 9999
with path contentListener
ReplaceText
as aboveLogAttribute
with log payload true
Curl command: curl -d "helloworld" -X POST http://localhost:9999/contentListener
Example output:
2019-01-24 16:04:44,529 INFO [Timer-Driven Process Thread-6] o.a.n.processors.standard.LogAttribute LogAttribute[id=8246b0a0-0168-1000-7254-2c2e43d136a7] logging for flow file StandardFlowFileRecord[uuid=5e1c6d12-298d-4d9c-9fcb-108c208580fa,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1548374015429-1, container=default, section=1], offset=3424, length=122],offset=0,name=5e1c6d12-298d-4d9c-9fcb-108c208580fa,size=122]
--------------------------------------------------
Standard FlowFile Attributes
Key: 'entryDate'
Value: 'Thu Jan 24 16:04:44 PST 2019'
Key: 'lineageStartDate'
Value: 'Thu Jan 24 16:04:44 PST 2019'
Key: 'fileSize'
Value: '122'
FlowFile Attribute Map Content
Key: 'filename'
Value: '5e1c6d12-298d-4d9c-9fcb-108c208580fa'
Key: 'path'
Value: './'
Key: 'restlistener.remote.source.host'
Value: '127.0.0.1'
Key: 'restlistener.remote.user.dn'
Value: 'none'
Key: 'restlistener.request.uri'
Value: '/contentListener'
Key: 'uuid'
Value: '5e1c6d12-298d-4d9c-9fcb-108c208580fa'
--------------------------------------------------
{"timestamp_now":"2019-01-24 16:04:44.518 -0800", "timestamp_ed": "2019-01-24 16:04:44.516 -0800", "message":"helloworld"}