Search code examples
regexfluentd

Fluentd filter to exclude key with empty value


I want exclude a row that have serive_name empty "service_name":"". Here is my fluentd conf

## match tag=debug.** and dump to console
<match debug.**>
  @type stdout
</match>
<source>
  @type tail
  path  /opt/wso2esb-4.9.0-wkr-1/repository/logs/wso2carbon.log
  pos_file /var/log/td-agent/tmp/wso2carbon.log.pos
  tag debug.wso2.esb
  format /^([TID:]* [^ ]* [^ ]* \[(?<time>[^\]]*)\]) ([^ ]* (?<level>[^ ]*))([^***]*[^=]*[^ ]*(?<service_name>[^,]*)[^=]*[^ ]*(?<step>[^,]*)[^ ]*[^=]*[^ ]*(?<message_id>[^,]*))/
  time_format %Y-%m-%d %H:%M:%S
#  keep_time_key true
</source>

Here is the log out-put

2017-08-21 09:57:10 +0700 debug.wso2.esb: {"level":"INFO","service_name":" SA_VasGWLogSeq","step":" before vasgwInsertlog","message_id":" urn:uuid:2046f0ed-690d-47b1-aa86-d4a71c021a74"}
2017-08-21 09:57:10 +0700 debug.wso2.esb: {"level":"INFO","service_name":"","step":"","message_id":""}
2017-08-21 09:57:10 +0700 debug.wso2.esb: {"level":"INFO","service_name":" SA_VasGWLogSeq","step":" after vasgwInsertlog","message_id":" urn:uuid:2046f0ed-690d-47b1-aa86-d4a71c021a74"}
2017-08-21 10:16:10 +0700 debug.wso2.esb: {"level":"INFO","service_name":" SERVICE_NAME","step":" Before - SA_ServiceApiDSEp","message_id":" urn:uuid:39e0ecc1-dda5-4cd9-91fc-90e7ed4f5233"}

I want to exclude below row. How to do that?

{"level":"INFO","service_name":"","step":"","message_id":""}

The second question is why I have a space before value "service_name":" SERVICE_NAME", when I try on Fluentular I get a nice output without space.

I have solved the second quest by add a space to regex. e.g. change

[^=]*[^ ]*(?<service_name>[^,]*) to [^=]*[^ ]* (?<service_name>[^,]*).


But I don't know how to write a filter to exclude a record by key_name that have empty value like "service_name":"".


Solution

  • Because I cannot find a solution to exclude record that key have empty value, I use the reverse solution. I use grep to keep record with specified key-value. See my Fluentd configuration below.

    Fluentd on each WSO2 node.

    #############################################################################################
    # Fluentd Configuration File                                                                #
    #                                                                                           #
    # In v1 configuration, type and id are @ prefix parameters.                                 #
    # @type and @id are recommended. type and id are still available for backward compatibility #
    #############################################################################################
    
    ################################
    #            Source            #
    ################################
    ## built-in TCP input
    ## $ echo <json> | fluent-cat <tag>
    <source>
      @type forward
      @id forward_input
    
      port 24224
    </source>
    
    # Listen DRb for debug
    <source>
      @type debug_agent
      @id debug_agent_input
    
      bind 127.0.0.1
      port 24230
    </source>
    
    # HTTP input
    # http://localhost:8888/<tag>?json=<json>
    #<source>
    #  @type http
    #  @id http_input
    
    #  port 8888
    #</source>
    
    # Listen HTTP for monitoring
    # http://localhost:24220/api/plugins
    # http://localhost:24220/api/plugins?type=TYPE
    # http://localhost:24220/api/plugins?tag=MYTAG
    <source>
      @type monitor_agent
      @id monitor_agent_input
    
      port 24220
    </source>
    
    <source>
      @type tail
    
      path /opt/wso2esb-4.9.0-wkr-1/repository/logs/wso2carbon.log
      pos_file /cc/fluent/wso2carbon.log.pos
      tag wso2.esb.service.test
      format /^([TID:]+ [^ ]+ [^ ]+ \[(?<time>[^\]]+)\]) ([^***]+[^=]+[^ ]+(?<transaction_id>[^,]*)[^=]+[^ ]+(?<service_name>[^,]*)[^=]+[^ ]+(?<data>[^,]*))/
      time_format %Y-%m-%d %H:%M:%S
      keep_time_key true
    </source>
    
    <source>
      @type tail
    
      path /opt/wso2esb-4.9.0-wkr-1/repository/logs/wso2carbon.log
      pos_file /cc/fluent/wso2carbon.log.pos
      tag wso2.esb.ne.surepay
      format /^([TID:]+ [^ ]+ [^ ]+ \[(?<time>[^\]]+)\]) ([^***]+[^=]+[^ ]+(?<service_name>[^,]*)[^=]+[^ ]+(?<transaction_id>[^,]*)[^<?]+(?<payload>[^{]*))/
      time_format %Y-%m-%d %H:%M:%S
      keep_time_key true
    </source>
    
    <source>
      @type tail
    
      path /opt/wso2esb-4.9.0-wkr-1/repository/logs/wso2carbon.log
      pos_file /cc/fluent/wso2carbon.log.pos
      tag wso2.esb.surepay.trigger
      format /^([TID:]+ [^ ]+ [^ ]+ \[(?<time>[^\]]+)\]) ([^*]+[^=]+[^ ]+(?<client_ip>[^,]*)[^=]+[^ ]+(?<service_name>[^,]*)[^=]+[^ ]+(?<req_id>[^,]*)[^=]+[^ ]+(?<content_massage>[^,]*)[^=]+[^ ]+)/
      time_format %Y-%m-%d %H:%M:%S
      keep_time_key true
    </source>
    
    
    ###########################
    #        Filter           #
    ###########################
    <filter wso2.esb.service.**>
      @type grep
    
      <regexp>
        key     service_name
        pattern ^\sNew
      </regexp>
    </filter>
    
    <filter wso2.esb.service.**>
      @type record_transformer
      enable_ruby
    
      <record>
        data ${record["data"].strip.split(";").each_slice(2).to_h.to_json}
      </record>
    </filter>
    
    <filter wso2.esb.service.**>
      @type parser
    
      format json
      key_name data
    </filter>
    
    <filter wso2.esb.ne.surepay>
      @type grep
    
      <regexp>
        key     service_name
        pattern ^\sNE_SurePay
      </regexp>
    </filter>
    
    <filter wso2.esb.ne.surepay>
      @type record_transformer
      enable_ruby
    
      <record>
        service_name ${record["service_name"].strip!}
        transaction_id ${record["transaction_id"].strip!}
        payload ${record["payload"].strip!}
      </record>
    </filter>
    
    <filter wso2.esb.surepay.trigger>
      @type grep
    
      <regexp>
        key     service_name
        pattern ^\sSurePayPassiveTrigger
      </regexp>
    </filter>
    
    <filter wso2.esb.surepay.trigger>
      @type record_transformer
      enable_ruby
    
      <record>
        client_ip ${record["client_ip"].strip!}
        service_name ${record["service_name"].strip!}
        req_id ${record["req_id"].strip!}
        content_massage ${record["content_massage"].strip!}
      </record>
    </filter>
    
    
    ###########################
    #        Output           #
    ###########################
    ## Debug
    ## match tag=debug.** and dump to console
    <match debug.**>
      @type stdout
      @id stdout_output
    </match>
    
    ## ESB Service Log
    ## match tag=wso2.esb.**. Forward to Fluentd Collector (, stdout for debug) and write to file
    <match wso2.esb.**>
      @type copy
    
      <store>
        @type forward
        @id forward_output
        buffer_path /cc/fluent/buffer/fluentd.forward
        buffer_type file
        flush_interval 10
        send_timeout 60
        heartbeat_type tcp
        heartbeat_interval 20
    
        <server>
          host 172.16.100.243
          port 24224
        </server>
        ## If have sencondary fluentd server for fail-over, enable <secondary> block
        # <secondary>
        #   <server>
        #     host 192.168.0.12
        #   </server>
        # </secondary>
      </store>
    
      <store>
        @type file
        @id file_output
    
        path  /cc/fluent/log/wso2
        time_slice_format %Y%m%d%H
        time_slice_wait   10m
        time_format       %Y-%m-%d %H:%M:%S%z
      </store>
    
      <store>
        @type stdout
      </store>
    </match>
    

    Fluentd collector (collecting data from all fluentd on each node):

    #############################################################################################
    # Fluentd Server Configuration File                                                                #
    #                                                                                           #
    # In v1 configuration, type and id are @ prefix parameters.                                 #
    # @type and @id are recommended. type and id are still available for backward compatibility #
    #############################################################################################
    
    ################################
    #            Source            #
    ################################
    ## built-in TCP input
    ## $ echo <json> | fluent-cat <tag>
    <source>
      @type forward
      @id forward_input
    
      port 24224
    </source>
    
    # Listen DRb for debug
    <source>
      @type debug_agent
      @id debug_agent_input
    
      bind 127.0.0.1
      port 24230
    </source>
    
    # HTTP input
    # http://localhost:8888/<tag>?json=<json>
    #<source>
    #  @type http
    #  @id http_input
    
    #  port 8888
    #</source>
    
    # Listen HTTP for monitoring
    # http://localhost:24220/api/plugins
    # http://localhost:24220/api/plugins?type=TYPE
    # http://localhost:24220/api/plugins?tag=MYTAG
    <source>
      @type monitor_agent
      @id monitor_agent_input
    
      port 24220
    </source>
    
    
    ###########################
    #        Filter           #
    ###########################
    # <filter wso2.esb.service.**>
    #   @type grep
    
    #   <regexp>
    #     key     service_name
    #     pattern ^New
    #   </regexp>
    # </filter>
    
    # <filter wso2.esb.ne.surepay>
    #   @type grep
    
    #   <regexp>
    #     key     service_name
    #     pattern ^NE_SurePay
    #   </regexp>
    # </filter>
    
    # <filter wso2.esb.ne.surepay>
    #   @type grep
    
    #   <regexp>
    #     key     service_name
    #     pattern ^SurePayPassiveTrigger
    #   </regexp>
    # </filter>
    
    
    ###########################
    #        Output           #
    ###########################
    ## Debug
    ## match tag=debug.** and dump to console
    <match debug.**>
      @type stdout
      @id stdout_output
    </match>
    
    ## ESB Service Log
    ## match tag=wso2.esb.service.** and insert into database (, stdout for debug) and write to file
    <match wso2.esb.**>
      @type copy
    
      <store>
        @type sql
        buffer_path /cc/fluent/buffer/fluentd.sql
        buffer_type file
        flush_interval 10
    
        host {ORACLE_HOST}
        port 1521
        database {ORACLE_DATABASE}
        adapter oracle_enhanced
        username {ORACLE_USERNAME}
        password {ORACLE_PADDWORD}
    
        <table>
          table {TABLE_NAME}
          column_mapping 'insert_date:insert_date,transaction_id:transaction_id,service_name:service_name,process_step:process_step,msisdn:msisdn,command:command,transaction_type:transaction_type,action:action,service_price:service_price,subcriber_type:subcriber_type,transaction_status:transaction_status,notification:notification,remark:remark,vas_error_code:vas_error_code,client_username:client_username,client_ip:client_ip,api_url:api_url,api_method:api_method,nei_name:nei_name,nei_error_code:nei_error_code,server_host:server_host'
          # This is the default table because it has no "pattern" argument in <table>
          # The logic is such that if all non-default <table> blocks
          # do not match, the default one is chosen.
          # The default table is required.
        </table>
    
        <table wso2.esb.service.test>
          table {TABLE_NAME}
          column_mapping 'insert_date:insert_date,transaction_id:transaction_id,service_name:service_name,process_step:process_step,msisdn:msisdn,command:command,transaction_type:transaction_type,action:action,service_price:service_price,subcriber_type:subcriber_type,transaction_status:transaction_status,notification:notification,remark:remark,vas_error_code:vas_error_code,client_username:client_username,client_ip:client_ip,api_url:api_url,api_method:api_method,nei_name:nei_name,nei_error_code:nei_error_code,server_host:server_host'
        </table>
    
        <table wso2.esb.ne.surepay>
          table {TABLE_NAME}
          column_mapping 'time:insert_date,transaction_id:transaction_id,service_name:service_name,payload:payload'
        </table>
    
        <table wso2.esb.surepay.trigger>
          table {TABLE_NAME}
          column_mapping 'time:insert_date,client_ip:client_ip,service_name:service_name,req_id:req_id,content_massage:content_massage'
        </table>
      </store>
    
      <store>
        @type file
        path  /cc/fluent/log/service
        time_slice_format %Y%m%d%H
        time_slice_wait   10m
        time_format       %Y-%m-%d %H:%M:%S%z
      </store>
    
      <store>
        @type stdout
      </store>
    </match>
    

    NOTE: I use frontd to tail log from WSO2 and then insert into Oracle Database.

    PLATFORM: RedHat 7, ruby 2.4.1p111, fluentd 0.12.40, activerecord-oracle_enhanced-adapter (1.8.2), ruby-oci8 (2.2.5), fluent-plugin-sql (0.6.1).

    UPDATE I have publish all configuration and installation detail on GitHub https://github.com/oemdaro/fluent-oracle-example