Search code examples
elasticsearchkubernetessplunkfluentd

How to filter logs based on severity in fluentd and send it to 2 different logging systems


i need help to configure Fluentd to filter logs based on severity.

we have 2 different monitoring systems Elasticsearch and Splunk, when we enabled log level DEBUG in our application it's generating tons of logs everyday, so we want to filter logs based severity and push it to 2 different logging systems.

when logs has severity: INFO and ERROR then forward container logs to Splunk and except those DEBUG, TRACE, WARN and other logs should go to elastocsearch, please help me how can we do filter it.

Here is the log generated format:

event.log:{"@severity":"DEBUG","@timestamp":"2019-01-18T00:15:34.416Z","@traceId":

event.log:{"@severity":"INFO","@timestamp":"2019-01-18T00:15:34.397Z","@traceId":

event.log:{"@severity":"WARN","@timestamp":"2019-01-18T00:15:34.920Z","@traceId":

please find below fluentd config.

i have added exclude method inside filter and also installed grep plugin added grep method, its not working.

added filter for testing:

<exclude>
       @type grep
       key severity 
       pattern DEBUG
      </exclude>

also added:

<filter kubernetes.**>
@type grep
exclude1 severity (DEBUG|NOTICE|WARN)
</filter>

kind: ConfigMap
apiVersion: v1
metadata:
  name: fluentd-config
  namespace: logging
  labels:
    k8s-app: fluentd
data:
  fluentd-standalone.conf: |
    <match fluent.**>
      @type null
    </match>
    # include other configs
    @include systemd.conf
    @include kubernetes.conf
  fluentd.conf: |
    @include systemd.conf
    @include kubernetes.conf
  fluentd.conf: |
    # Use the config specified by the FLUENTD_CONFIG environment variable, or
    # default to fluentd-standalone.conf
    @include "#{ENV['FLUENTD_CONFIG'] || 'fluentd-standalone.conf'}"
  kubernetes.conf: |
    <source>
      @type tail
      @log_level debug
      path /var/log/containers/*.log
      pos_file /var/log/kubernetes.log.pos
      time_format %Y-%m-%dT%H:%M:%S.%NZ
      tag kubernetes.*
      format json
    </source>
    <filter kubernetes.**>
      @type kubernetes_metadata
      verify_ssl false
      <exclude>
       @type grep
       key severity 
       pattern DEBUG
      </exclude>
    </filter>
    <filter kubernetes.**>
      @type record_transformer
      enable_ruby
      <record>
        event ${record}
      </record>
      renew_record
      auto_typecast
    </filter>
    <filter kubernetes.**>
    @type grep
    exclude1 severity (DEBUG|NOTICE|WARN)
    </filter>
  kubernetes.conf: |
    <source>
      @type tail
      @log_level debug
      path /var/log/containers/*.log
      pos_file /var/log/kubernetes.log.pos
      time_format %Y-%m-%dT%H:%M:%S.%NZ
      tag kubernetes.*
      format json
    </source>
    <filter kubernetes.**>
      @type kubernetes_metadata
      verify_ssl false
    </filter>
    <filter kubernetes.**>
      @type record_transformer
      enable_ruby
      <record>
        event ${record}
      </record>
      renew_record
      auto_typecast
    </filter>
    # The `all_items` paramater isn't documented, but it is necessary in order for
    # us to be able to send k8s events to splunk in a useful manner
    <match kubernetes.**>
      @type copy
      <store>
        @type splunk-http-eventcollector
        all_items true
        server localhost:8088
        protocol https
        verify false
      </store>
      <store>
        @type elasticsearch
        host localhost
        port 9200
        scheme http
        ssl_version TLSv1_2
        ssl_verify false
        </buffer>
      </store>
    </match>


Solution

  • How about the following? (not tested)

    <source>
      @type tail
      @log_level debug
      path /var/log/containers/*.log
      pos_file /var/log/kubernetes.log.pos
      time_format %Y-%m-%dT%H:%M:%S.%NZ
      tag kubernetes.*
      format json
      @label @INPUT
    </source>
    
    <label @INPUT>
      <filter kubernetes.**>
        @type kubernetes_metadata
        verify_ssl false
      </filter>
      <filter kubernetes.**>
        @type record_transformer
        enable_ruby
        <record>
          event ${record}
        </record>
        renew_record
        auto_typecast
      </filter>
      <match>
        @type relabel
        @label @RETAG
      </match>
    </label>
    
    <label @RETAG>
      <match>
        @type rewrite_tag_filter
        <rule>
          key @severity
          pattern /(INFO|ERROR)/
          tag splunk.${tag}
        </rule>
        <rule>
          key @severity
          pattern /(DEBUG|TRACE|WARN)/
          tag elasticsearch.${tag}
        </rule>
        @label @OUTPUT
      </match>
    </label>
    
    <label @OUTPUT>
      <match splunk.**>
        @type splunk-http-eventcollector
        # ... snip
      </match>
      <match elasticsearch.**>
        @type elasticsearch
        # ... snip
      </match>
    </label>