I'm new in Fluentd. I use kubernetes daemonset fluentd to collect logs from docker containers and send them to kafka. I have another kubernetes service that consumes messages from kafka and sends them to elasticsearh followed by kibana. I want fluentd record's log
field to be splitted into a separate fields for a further usage in kibana search queries. For instance:
source record:
"log" : "{\"foo\" : \"bar\"}"
Output:
"foo" : "bar"
"log" : "{\"foo\" : \"bar\"}"
I came up with the following config:
<source>
@type kafka
brokers "#{ENV['FLUENT_KAFKA_BROKERS']}"
topics "#{ENV['FLUENT_KAFKA_TOPICS']}"
</source>
<filter "#{ENV['FLUENT_KAFKA_TOPICS']}">
@type parser
key_name log
reserve_data true
<parse>
@type json
</parse>
</filter>
<match "#{ENV['FLUENT_KAFKA_TOPICS']}">
@type elasticsearch
host "#{ENV['FLUENT_ELASTICSEARCH_HOST']}"
port "#{ENV['FLUENT_ELASTICSEARCH_PORT']}"
scheme "#{ENV['FLUENT_ELASTICSEARCH_SCHEME'] || 'http'}"
logstash_format true
</match>
But unfortunately not all logs are in json format, as result json parsers fails to parse plain text: ParserError error="pattern not match with data
Is it possible to apply json parser only if the log
field is a valid json object ? If it's a plain text I'd like it to be sent as it is.
Found this library https://github.com/ninadpage/fluent-plugin-parser-maybejson/
it doesn't work with latest fluentd, will create a PR to address that, it's just a matter of adding: "require 'fluent/parser'"
UPDATE: after upgrading fluentd version plugin doesn't work as described in README. I found another one that works with fluentd 1.4.0
fluent-plugin-multi-format-parser
ended up having the following config:
<filter "#{ENV['FLUENT_KAFKA_TOPICS']}">
@type parser
key_name log
reserve_data true
<parse>
@type multi_format
<pattern>
format json
</pattern>
<pattern>
format none
</pattern>
</parse>
</filter>
<filter "#{ENV['FLUENT_KAFKA_TOPICS']}">
@type record_transformer
remove_keys message # remove message from non jsom logs
</filter>