Search code examples
logstashfilebeat

Logstash to merge unstructured log lines to one event


I have my log format like this

[23:59:43] TYPE[student03]
[23:59:43] Write Ack!
[23:59:43] <application><status>Active</status></application>
[23:59:43] --------PushToFile Start--------
[23:59:43] --------PushToFile Name --------
[23:59:43] --------PushToFile End--------
[23:59:47] --------on parser--------
[23:59:47] ClientIPAddress > 123.12.123.2
[23:59:43] TYPE[student04]
[23:59:43] Write Ack!
[23:59:43] FGHSFHG3453D56HJ3456FJ372GJ1387FFKJH
[23:59:43] --------PushToFile Start--------
[23:59:43] --------PushToFile Name --------
[23:59:43] --------PushToFile End--------
[23:59:47] --------on parser--------
[23:59:47] ClientIPAddress > 123.12.123.2
[23:59:43] TYPE[student03]
[23:59:43] Write Ack!
[23:59:43] FGHSFHG3453D56HJ3456FJ372GJ1387FFKJH
[23:59:43] --------PushToFile Start--------
[23:59:43] --------PushToFile Name --------
[23:59:43] --------PushToFile End--------
[23:59:47] --------on parser--------
[23:59:47] ClientIPAddress > 123.12.123.2

I want to parse these logs to get only TYPE[student03] and the Status of the student. I am dropping the remaining lines with lostash filters. Now I need the TYPE and status as a single event so that I can send it to rabbitmq as a single message like

[23:59:43]-student03-<application><status>Active</status></application>

How can we achieve this with aggregate filters? I tried aggregate filters. But, I think we need the same pattern in both log lines to merge with aggregate. or any alternatives for this

Please help!

My logstash configuration file

    beats {
     port => 5044
   }
}
filter {

  if ([message] =~ "Write Ack!"){
   drop{}
  }
  else if ([message] =~ "PushToFile Start"){
    drop{}
  }
  else if ([message] =~ "PushToFile Name"){
    drop{}
  }
  else if ([message] =~ "PushToFile End"){
    drop{}
  }
  else if ([message] =~ "on parser"){
     drop{}
  }  
  else if ([message] =~ "ClientIPAddress") {
    drop { }
  }                     
}
output {
 stdout { codec => rubydebug }
}

Solution

  • You can use a Grok filter(see below) to parse only the two required lines and later use the below Ruby filter to do the necessary string operations and save the final message in a field and use it.

    Grok filter

    grok{
        match => {"message" => "%{TIME}] TYPE.%{GREEDYDATA:STATUS}."}
        match => {"message" => "%{TIME}] <application><status>Active</status></application>"}
        }
    if "_grokparsefailure" in [tags]{drop{}}
    

    Ruby filter

    ruby{
        code => "
            if event.get('STATUS')
                @save_status = event.get('STATUS')
                event.cancel()
            else
                event.set('final_message', event.get('message').sub(' ', '-'+@save_status+'-'))
            end
        "
    }
    

    Please note that this works only if your logs come in sequence.