Search code examples
elasticsearchlogstashelastic-stacklogstash-grok

How to write grok pattern in logstash


I am trying to start with logstash and my application has following type of logs. Here 5 indicate 5 more lines will follow which are stats collected for different related things.

These are basically application stats with each line indicating about one of the resource.

Is there a way to properly parse it using logstash so that it can be use for Elastic search?

[20170502 01:57:26.209 EDT (thread-name) package-name.classname#MethodName INFO] Some info line (5 stats):
[fieldA: strvalue1| field2: 0 | field3: 0 | field4: 0 | field5: 0 | field6: 0 | field7: 0]
[fieldA: strvalue2| field2: 0 | field3: 0 | field4: 0 | field5: 0 | field6: 0 | field7: 0]
[fieldA: strvalue3| field2: 0 | field3: 0 | field4: 0 | field5: 0 | field6: 0 | field7: 0]
[fieldA: strvalue4| field2: 0 | field3: 0 | field4: 0 | field5: 0 | field6: 0 | field7: 0]
[fieldA: strvalue5| field2: 0 | field3: 0 | field4: 0 | field5: 0 | field6: 0 | field7: 0]

EDIT:

This is the configuration I am using, with this first set of stats are getting parsed properly but after that pipeline get stuck. Please note there 150 such logs but if I keep only 2-3 then it works fine. Can you please help me identifying issue here?

# [20170513 06:08:29.734 EDT (StatsCollector-1) deshaw.tools.jms.ActiveMQLoggingPlugin$ActiveMQDestinationStatsCollector#logPerDestinationStats INFO] ActiveMQ Destination Stats (97 destinations):
# [destName: topic://darts.metaDataChangeTopic | enqueueCount: 1 | dequeueCount: 1 | dispatchCount: 1 | expiredCount: 0 | inflightCount: 0 | msgsHeld: 0 | msgsCached: 0 | memoryPercentUsage: 0 | memoryUsage: 0 | memoryLimit: 536870912 | avgEnqueueTimeMs: 0.0 | maxEnqueueTimeMs: 0 | minEnqueueTimeMs: 0 | currentConsumers: 1 | currentProducers: 0 | blockedSendsCount: 0 | blockedSendsTimeMs: 0 | minMsgSize: 2392 | maxMsgSize: 2392 | avgMsgSize: 2392.0 | totalMsgSize: 2392]

input {
  file {
    path => "/u/bansalp/activemq_primary_plugin.stats.log.1"
### For testing and continual process of the same file, remove these before produciton
    start_position => "beginning"
    sincedb_path => "/dev/null"
### Lets read the logfile and recombine multi line details
    codec => multiline {
      # Grok pattern names are valid! :)
      pattern => "^\[destName:"
      negate => false
      what => "previous"
    }
  }
}

filter {
    if ([message] =~ /^\s*$/ ){
        drop{}
    }
    if ([message] =~ /^[^\[]/) {
            drop{}
    }

    if ([message] =~ /logMemoryInfo|logProcessInfo|logSystemInfo|logThreadBreakdown|logBrokerStats/) {
            drop{}
    }
    if [message] =~ "logPerDestinationStats" {
        grok {
                match => { "message" => "^\[%{YEAR:yr}%{MONTHNUM:mnt}%{MONTHDAY:daynum}\s*%{TIME:time}\s*%{TZ:timezone}\s*(%{DATA:thread_name})\s*%{JAVACLASS:javaclass}#%{WORD:method}\s*%{LOGLEVEL}\]\s*"
                }
        }
        split { 
            field => "message"
        }
        grok {
                match => { "message" => "^\[%{DATA}:\s*%{DATA:destName}\s*\|\s*%{DATA}:\s*%{NUMBER:enqueueCount}\s*\|\s*%{DATA}:\s*%{NUMBER:dequeueCount}\s*\|\s*%{DATA}:\s*%{NUMBER:dispatchCount}\s*\|\s*%{DATA}:\s*%{NUMBER:expiredCount}\s*\|\s*%{DATA}:\s*%{NUMBER:inflightCount}\s*\|\s*%{DATA}:\s*%{NUMBER:msgsHeld}\s*\|\s*%{DATA}:\s*%{NUMBER:msgsCached}\s*\|\s*%{DATA}:\s*%{NUMBER:memoryPercentUsage}\s*\|\s*%{DATA}:\s*%{NUMBER:memoryUsage}\s*\|\s*%{DATA}:\s*%{NUMBER:memoryLimit}\s*\|\s*%{DATA}:\s*%{NUMBER:avgEnqueueTimeMs}\s*\|\s*%{DATA}:\s*%{NUMBER:maxEnqueueTimeMs}\s*\|\s*%{DATA}:\s*%{NUMBER:minEnqueueTimeMs}\s*\|\s*%{DATA}:\s*%{NUMBER:currentConsumers}\s*\|\s*%{DATA}:\s*%{NUMBER:currentProducers}\s*\|\s*%{DATA}:\s*%{NUMBER:blockedSendsCount}\s*\|\s*%{DATA}:\s*%{NUMBER:blockedSendsTimeMs}\s*\|\s*%{DATA}:\s*%{NUMBER:minMsgSize}\s*\|\s*%{DATA}:\s*%{NUMBER:maxMsgSize}\s*\|\s*%{DATA}:\s*%{NUMBER:avgMsgSize}\s*\|\s*%{DATA}:\s*%{NUMBER:totalMsgSize}\]$" }
        }
        mutate {
            convert => { "message" => "string" }
            add_field => {
                "session_timestamp" => "%{yr}-%{mnt}-%{daynum} %{time} %{timezone}"
                "load_timestamp" => "%{@timestamp}"
            }
            remove_field => ["yr","mnt", "daynum", "time", "timezone"]
        }
    }
}
output {
  stdout {codec => rubydebug}
}

Solution

  • There certainly is.

    What you will need to do is utilise the multiline codec on your input filter.

    As per the example:

    input {
      file {
        path => "/var/log/someapp.log"
        codec => multiline {
          # Grok pattern names are valid! :)
          pattern => "^\[%{YEAR}%{MONTHNUM}%{MONTHDAY}\s*%{TIME}"
          negate => true
          what => previous
        }
      }
    }
    

    This basically states that any line that doesnt start with the YYYYMMDD HH:mi:ss.000 will merge with the previous line

    From there you can now apply Grok patterns to the first line (to get high level data).

    Once you're happy you have all the data you require from the first line, you can then split on \r or \n and get individual stats data using a single grok pattern (based on the examples you gave above).

    Hope this helps

    D

    Update 2017-05-08 11:54:

    Full logstash conf could possibly look like this, you will need to consider changing the grok patterns to better suit your requirements (only you know your data).

    Note, this hasn't been tested, I leave that up to you.

    input {
      file {
        path => "/var/log/someapp.log"
    ### For testing and continual process of the same file, remove these before produciton
        start_position => "beginning"
        sincedb_path => "/dev/null"
    ### Lets read the logfile and recombine multi line details
        codec => multiline {
          # Grok pattern names are valid! :)
          pattern => "^\[%{YEAR}%{MONTHNUM}%{MONTHDAY}\s*%{TIME}"
          negate => true
          what => previous
        }
      }
    }
    filter {
    ### Let's get some high level data before we split the line (note: anything you grab before the split gets copied)
        grok {
            match => { "message" => "^\[%{YEAR:yr}%{MONTHNUM:mnt}%{MONTHDAY:daynum}\s*%{TIME:time}\s*%{TZ:timezone}\s*(%{DATA:thread_name})\s*%{JAVACLASS:javaclass}#%{WORD:method}\s*%{LOGLEVEL}\]"
            }
        }
    ### Split the lines back out to being a single line now. (this may be a \r or \n, test which one)
        split { 
            "field" => "message"
            "terminator" => "\r" 
        }
    ### Ok, the lines should now be independent, lets add another grok here to get the patterns as dictated by your example [fieldA: str | field2: 0...] etc.
    ### Note: you should look to change the grok pattern to better suit your requirements, I used DATA here to quickly capture your content
        grok {
            break_on_match => false
            match => { "message" => "^\[%{DATA}:\s*%{DATA:fieldA}\|%{DATA}:\s*%{DATA:field2}\|%{DATA}:\s*%{DATA:field3}\|%{DATA}:\s*%{DATA:field4}\|%{DATA}:\s*%{DATA:field5}\|%{DATA}:\s*%{DATA:field6}\|%{DATA}:\s*%{DATA:field7}\]$" }
        }
        mutate {
        convert => { "message" => "string" }
            add_field => {
                "session_timestamp" => "%{yr}-%{mnt}-%{daynum} %{time} %{timezone}"
                "load_timestamp" => "%{@timestamp}"
            }
            remove_field => ["yr","mnt", "daynum", "time", "timezone"]
        }
    }
    output {
      stdout { codec => rubydebug }
    }
    

    EDIT 2017-05-15

    Logstash is a complex parser, it expects to stay up as a process and continuously monitor the log files (hence why you have to crash it out)

    Break on match would mean you could have multiple match requirements for the same line, if it didn't find a match it would try the next in the list (always go complex to simple)

    Your input filter, change the path to end with .log*, also, as per your original example, does the pattern not have to be matched to the date format required (in order to bring all associations onto a single line)

    Your filter should be specifying what the split character is I believe (otherwise I believe the default is a comma).

    input {
      file {
        path => "/u/bansalp/activemq_primary_plugin.stats.log*"
    ### For testing and continual process of the same file, remove these before production
        start_position => "beginning"
        sincedb_path => "/dev/null"
    ### Lets read the logfile and recombine multi line details
        codec => multiline {
          # Grok pattern names are valid! :)
          pattern => "^\[destName:"
          negate => false
          what => "previous"
        }
      }
    }
    
    filter {
        if "logPerDestinationStats" in [message] {
            grok {
                    match => { "message" => "^\[%{YEAR:yr}%{MONTHNUM:mnt}%{MONTHDAY:daynum}\s*%{TIME:time}\s*%{TZ:timezone}\s*(%{DATA:thread_name})\s*%{JAVACLASS:javaclass}#%{WORD:method}\s*%{LOGLEVEL}\]\s*"
                    }
            }
            split { 
                field => "message"
                terminator => "\r”
                }
            grok {
                    match => { "message" => "^\[%{DATA}:\s*%{DATA:destName}\s*\|\s*%{DATA}:\s*%{NUMBER:enqueueCount}\s*\|\s*%{DATA}:\s*%{NUMBER:dequeueCount}\s*\|\s*%{DATA}:\s*%{NUMBER:dispatchCount}\s*\|\s*%{DATA}:\s*%{NUMBER:expiredCount}\s*\|\s*%{DATA}:\s*%{NUMBER:inflightCount}\s*\|\s*%{DATA}:\s*%{NUMBER:msgsHeld}\s*\|\s*%{DATA}:\s*%{NUMBER:msgsCached}\s*\|\s*%{DATA}:\s*%{NUMBER:memoryPercentUsage}\s*\|\s*%{DATA}:\s*%{NUMBER:memoryUsage}\s*\|\s*%{DATA}:\s*%{NUMBER:memoryLimit}\s*\|\s*%{DATA}:\s*%{NUMBER:avgEnqueueTimeMs}\s*\|\s*%{DATA}:\s*%{NUMBER:maxEnqueueTimeMs}\s*\|\s*%{DATA}:\s*%{NUMBER:minEnqueueTimeMs}\s*\|\s*%{DATA}:\s*%{NUMBER:currentConsumers}\s*\|\s*%{DATA}:\s*%{NUMBER:currentProducers}\s*\|\s*%{DATA}:\s*%{NUMBER:blockedSendsCount}\s*\|\s*%{DATA}:\s*%{NUMBER:blockedSendsTimeMs}\s*\|\s*%{DATA}:\s*%{NUMBER:minMsgSize}\s*\|\s*%{DATA}:\s*%{NUMBER:maxMsgSize}\s*\|\s*%{DATA}:\s*%{NUMBER:avgMsgSize}\s*\|\s*%{DATA}:\s*%{NUMBER:totalMsgSize}\]$" }
            }
            mutate {
                convert => { "message" => "string" }
                add_field => {
                    "session_timestamp" => "%{yr}-%{mnt}-%{daynum} %{time} %{timezone}"
                    "load_timestamp" => "%{@timestamp}"
                }
                remove_field => ["yr","mnt", "daynum", "time", "timezone"]
            }
        }
       else {
          drop{}
        }
    }
    

    Please excuse the formatting I'm currently updating this from a mobile, I am happy for someone to update the formatting in my stead.