Search code examples
elasticsearchkibanafluentd

Fluentd route based on payload type


I'm new in Fluentd. I use kubernetes daemonset fluentd to collect logs from docker containers and send them to kafka. I have another kubernetes service that consumes messages from kafka and sends them to elasticsearh followed by kibana. I want fluentd record's log field to be splitted into a separate fields for a further usage in kibana search queries. For instance:

source record:

"log" : "{\"foo\" : \"bar\"}"

Output:

"foo" : "bar"
"log" : "{\"foo\" : \"bar\"}"

I came up with the following config:

<source>
@type kafka
brokers "#{ENV['FLUENT_KAFKA_BROKERS']}"
topics "#{ENV['FLUENT_KAFKA_TOPICS']}"
</source>

<filter "#{ENV['FLUENT_KAFKA_TOPICS']}">
@type parser
key_name log
reserve_data true
 <parse>
   @type json
 </parse>
</filter>

<match "#{ENV['FLUENT_KAFKA_TOPICS']}">
@type elasticsearch
host "#{ENV['FLUENT_ELASTICSEARCH_HOST']}"
port "#{ENV['FLUENT_ELASTICSEARCH_PORT']}"
scheme "#{ENV['FLUENT_ELASTICSEARCH_SCHEME'] || 'http'}"
logstash_format true
</match>

But unfortunately not all logs are in json format, as result json parsers fails to parse plain text: ParserError error="pattern not match with data

Is it possible to apply json parser only if the log field is a valid json object ? If it's a plain text I'd like it to be sent as it is.


Solution

  • Found this library https://github.com/ninadpage/fluent-plugin-parser-maybejson/

    it doesn't work with latest fluentd, will create a PR to address that, it's just a matter of adding: "require 'fluent/parser'"

    UPDATE: after upgrading fluentd version plugin doesn't work as described in README. I found another one that works with fluentd 1.4.0

    fluent-plugin-multi-format-parser

    ended up having the following config:

    <filter "#{ENV['FLUENT_KAFKA_TOPICS']}">
      @type parser
      key_name log
      reserve_data true
      <parse>
        @type multi_format
        <pattern>
          format json
        </pattern>
        <pattern>
          format none
        </pattern>
      </parse>
    </filter>
    
    <filter "#{ENV['FLUENT_KAFKA_TOPICS']}">
      @type record_transformer
      remove_keys message  # remove message from non jsom logs
    </filter>