Search code examples
fluentdtd-agent

Extracting and logging only value of a key in fluentd


I have an input file with JSON messages (line delimited). The format of the messages is as below.

{"level":"info","message":{"accountId":99,"end_dateTime":"","id":0.22837359658442535,"log":[]}}
{"level":"info","message":{"accountId":100,"end_dateTime":"","id":0.2583,"log":[]}}
{"level":"info","message":{"accountId":200,"end_dateTime":"","id":0.5783,"log":[]}}

I wanted to save the message part of it in the mongo database using fluentd. In mongo, the data in the collection should look like the below.

{
    "_id": ObjectId("626a1b813c04335a858e5926"),
    "accountId": 99,
    "end_dateTime": "",
    "id": 0.22837359658442535,
    "log": []
}

I wanted to extract and save only the value of the message key of the input payload.

I have tried using the below config, but it is not working.

<source>
  @type tail
  @id input_tail2
  read_from_head true
  path "/opt/webhook-logs/webhook.log"
  pos_file "/opt/webhook-logs/webhook.log.pos"
  tag "td.mongo.events"
  <parse>
    @type "json"
    unmatched_lines 
  </parse>
</source>
<match td.mongo.**>
  @type mongo
  host "127.0.0.1"
  port 27017
  database "vonnect"
  collection "webhooklog"
  user "vonnect"
  password xxxxxx
  buffer_chunk_limit 8m
  time_key time
  <buffer>
    flush_interval 2s
  </buffer>
  <inject>
    time_key time
  </inject>
</match>

I have tried record transformer filter too but no success. The filter config looks like

<filter td.mongo.**>
  @type record_transformer
  renew_record true
  # enable_ruby
  # auto_typecast true
  <record>
    ${record["message"]}
  </record>
</filter>

Solution

  • Here's a working example with record_transformer and parser filter plugin:

    fluent.conf

    <source>
      @type sample
      @id in_sample
      sample [
        {"level":"info","message":{"accountId":99,"end_dateTime":"","id":0.22837359658442535,"log":[]}},
        {"level":"info","message":{"accountId":100,"end_dateTime":"","id":0.2583,"log":[]}},
        {"level":"info","message":{"accountId":200,"end_dateTime":"","id":0.5783,"log":[]}}
      ]
      tag sample
    </source>
    
    <filter sample>
      @type record_transformer
      renew_record true
      keep_keys message
      enable_ruby true
      <record>
        message ${record["message"].to_json.to_s}
      </record>
    </filter>
    
    <filter sample>
      @type parser
      key_name message
      reserve_data true
      remove_key_name_field true
      <parse>
        @type json
      </parse>
    </filter>
    
    <match sample>
      @type stdout
    </match>
    

    Run:

    fluentd -c fluent.conf
    

    Output:

    2022-04-28 14:04:29.094892632 +0500 sample: {"accountId":99,"end_dateTime":"","id":0.22837359658442535,"log":[]}
    2022-04-28 14:04:30.097973274 +0500 sample: {"accountId":100,"end_dateTime":"","id":0.2583,"log":[]}
    2022-04-28 14:04:31.000677835 +0500 sample: {"accountId":200,"end_dateTime":"","id":0.5783,"log":[]}
    

    record_transformer removes the extra keys and keeps message with its value converted to a plain string. parser then removes the message key and expands its string value as JSON.

    References: