Search code examples
logstashlogstash-grokfilebeat

How to filter JSON using Logstash / Filebeat and Grok


I am very new to these technologies and currently I am scratching my head over a week or so to find a suitable answer. I have a log file like:

"2017-04-13 17:15:34.649 INFO  [http-bio-8080-exec-5] Adapter:132 |Empty|Empty|===Request object=== GetTransKey=============
"2017-04-13 17:15:34.699 INFO  [http-bio-8080-exec-5] Adapter:133 |Empty|Empty|Request object : sessionId:null,  busiCode:GetTransKey,  reqPubInfo:{"appId":"com.info.tss","sessionId":null,"version":"10000","timestamp":"20150206165957","lang":"EN","userId":null,"serviceId":null,"circleId":null,"route":null,"customerId":null,"osType":null}, param:{"type":0,"key":"MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQCKmsCyw+YomiNbvkUP3D7OtvOMd7jq0aNa0APSp5E5PsYW7fpaUMniWkQeAwD3EmhzF5v3oXGA2bqAZ+b0ZJgv2BoEGYPoaCzOZBglDzUe8xldK5mMJHLiMwL0enkwURQvubnTUAxXMS0SPcXq4/jyX9mBu27Ht+zjT8Y3vO51JwIDAQAB","deviceInfo":null}
"2017-04-13 17:15:34.699 INFO  [http-bio-8080-exec-5] Adapter:137 |Empty|Empty|Event:GetTransKey|StartTime:1492083934699ms
"2017-04-13 17:15:34.713 DEBUG [http-bio-8080-exec-5] RedisCache:72 |Empty|Empty|===mode=1 Redis cache connect to host:10.135.25.108 port:28333
"2017-04-13 17:15:34.720 DEBUG [http-bio-8080-exec-5] RedisCache:159 |Empty|Empty|{"lifo":true,"fairness":false,"maxWaitMillis":20,"minEvictableIdleTimeMillis":60000,"softMinEvictableIdleTimeMillis":1800000,"numTestsPerEvictionRun":-1,"evictionPolicyClassName":"org.apache.commons.pool2.impl.DefaultEvictionPolicy","testOnCreate":false,"testOnBorrow":false,"testOnReturn":true,"testWhileIdle":true,"timeBetweenEvictionRunsMillis":30000,"blockWhenExhausted":true,"jmxEnabled":true,"jmxNamePrefix":"pool","jmxNameBase":null,"maxTotal":50,"maxIdle":10,"minIdle":0}
"2017-04-13 17:15:42.830 INFO  [http-bio-8080-exec-5] Adapter:145 |Empty|Empty|Event:GetTransKey|End Time:1492083942830ms|Total Time:8131ms|Status:0
"2017-04-13 17:15:42.831 INFO  [http-bio-8080-exec-5] Adapter:148 |Empty|Empty|===Resp data===  GetTransKey=============
"2017-04-13 17:15:42.831 INFO  [http-bio-8080-exec-5] Adapter:149 |Empty|Empty|Resp object : sessionId:null,  busiCode:GetTransKey,  respData:{"transKey":"W73GHuCMhSXnihDxlBA/QKzbF4dhqZlLWylINlvi4Ben1ViECepll2zL7Az489Uk4/e0HsT3/zkG\nSyIB9M9EDbp9rLqZIARCcBRUIYJ/N3YIDrQSvD7SyoIjg+ti/my17U/TLVgi3BLPkMQw9/0XhNpA\n/LYePHed2pe0FYun3xo=","sessionId":"216bc5f3-cdec-4998-9494-717c8e3769a6"}

In here I am only interested in two JSON objects, which are reqPubInfo and respData However I am unable to determine how to parse these, the documentation seems like a sea out there. Please guide me as to how do I only parse for JSON objects in the logs.

So far my config file looks like:

input {
    beats {
        port => "5043"
    }

}
filter {
json {
    source => "message"
  }
}
output {
    stdout { codec => rubydebug }
}

Solution

  • You need to parse the message using a grok filter before you can apply the JSON filter. It would be better if you could modify the application's logger configuration to output pure JSON so that grok parsing isn't needed at all.

    filter {
      # Parse the log message.
      grok {
        pattern_definitions => {
          "LOGDATE"   => "[\d]{4}-[\d]{2}-[\d]{2} %{TIME}"
          "LOGHEADER" => "%{LOGDATE:logdate} %{LOGLEVEL:level}\s+\[%{GREEDYDATA:thread}\] %{NOTSPACE:file}:%{NUMBER:line}\s?"
        }   
        match => {
          message => [
            "%{LOGHEADER} %{GREEDYDATA:message} reqPubInfo:%{GREEDYDATA:reqPubInfo}, param:%{GREEDYDATA:param}",
            "%{LOGHEADER} %{GREEDYDATA:message} respData:%{GREEDYDATA:respData}",
            "%{LOGHEADER} %{GREEDYDATA:message}"
          ]   
        }   
        overwrite => [ "message" ]
      }   
    
      # Set @timestamp using the date in the log message.
      date {
        match => [ "logdate", "yyyy-MM-dd HH:mm:ss.SSS" ]
        remove_field => [ "logdate" ]
      }   
    
      # Parse the JSON data.
      if [reqPubInfo] {
        json {
          source => "reqPubInfo"
          target => "reqPubInfo"
        }   
        json {
          source => "param"
          target => "param"
        }   
      } else if [respData] {
        json {
          source => "respData"
          target => "respData"
        }   
      }   
    }
    

    The self-contained config I used for testing can be found here.