Search code examples
elasticsearchlogstashlogstash-configurationfilebeat

Issue with the logstash config while parsing multiline log


I have the below multiline log which I am trying to parse with my logstash config.

2020-05-27 11:59:17 ----------------------------------------------------------------------
2020-05-27 11:59:17 Got context
2020-05-27 11:59:17 Raw context:
                    EMAIL=abc.def@example.com
                    NAME=abc.def
                    PAGER=+11111111111111
                    DATE=2020-05-27
                    AUTHOR=
                    COMMENT=
                    ADDRESS=1.1.1.1
                    ALIAS=abc.example.com
                    ATTEMPT=1
2020-05-27 11:59:17 Previous service hard state not known. Allowing all states.
2020-05-27 11:59:17 Computed variables:
                    URL=abc.example.com
                    STATE=UP                
2020-05-27 11:59:17 Preparing flexible notifications for abc.def
2020-05-27 11:59:17  channel with plugin sms
2020-05-27 11:59:17  - Skipping: set
2020-05-27 11:59:17  channel with plugin plain email        
2020-05-27 11:59:20 --------------------------------------------------------------------

This is my logstash config:

    input {
      stdin { }
    }

    filter {

            grok {
                match => { "message" => "(?m)%{GREEDYDATA:data}"}
            }
            if [data] {
                  mutate {
                     gsub => [
                         "data", "^\s*", ""
                     ]
                  }
                  mutate {
                      gsub => ['data', "\n", " "]
                  }
             }
}

    output {
      stdout { codec => rubydebug }
    }

Filebeat config:

  multiline.pattern: '^[[:space:]][A-Za-z]* (?m)'
  multiline.negate: false
  multiline.match: after

What I want to achieve: The multiline log will first be matched with the multiline pattern and will be split into lines like

  Message1:  2020-05-27 11:59:17 ----------------------------------------------------------------------

  Message2: 2020-05-27 11:59:17 Got context

  Message3:  2020-05-27 11:59:17 Raw notification context:
                        EMAIL=abc.def@example.com
                        NAME=abc.def
                        PAGER=+11111111111111
                        DATE=2020-05-27
                        AUTHOR=
                        COMMENT=
                        ADDRESS=1.1.1.1
                        ALIAS=abc.example.com
                        ATTEMPT=1

After this, when these log lines gets parsed it will be again get split with a delimiter and then i can use kv filter to read each key value pair like ALIAS=abc.example.com in a single message number 3.

Can you advise how to achieve this ?


Solution

  • I would suggest you to read from file using a multiline codec (you can also define it in filter section if you are using stdin) while providing the pattern for each new line with a prefix of a timestamp.

    Then in your grok filter use the KV filter in order to split fields and values like this:

    input {
        file {
            path => "C:/work/elastic/logstash-6.5.0/config/test.txt"
            start_position => "beginning"
            codec => multiline {
                pattern => "^%{TIMESTAMP_ISO8601}"
                negate => true
                what => "previous"
            }
        }
    }
    filter {
        kv {
            field_split => "\r\n"
            value_split => "="
            source => "message"
        }
    }
    
    output {
        elasticsearch {
            hosts => ["http://localhost:9200"]
            index => "test"
        }
    }
    

    Result in Kibana should look like this:

    enter image description here

    and this:

    enter image description here

    EDIT: In the comments you stated you failed to see full value including spaces. I have re-tested my solution with the new STATE provided by you and it is working fine:

    enter image description here