I am parsing the syslog into the ELK-stack.
Syslog example
Jul 19 10:47:21 host-abc systemd: Started myservice
Jul 19 10:47:29 host-abc systemd: Started service.
Jul 19 10:47:29 host-abc systemd: Starting service...
What ideally would like to do is to aggregate the 2nd and third line into one message,for example returning: Started Service. Starting service...
therefore I would like the timestamp, hostname and program name to match before the lines are combined.
You can use the aggregate
filter in order to achieve what you want. The aggregate
filter provides support for aggregating several log lines into one single event based on a common field value. In your case, the common field would be a combination of @timestamp
, hostname
and program_name
.
Since the syslog
input is already properly parsing the syslog lines, we don't need to grok anything, so we can leverage the aggregate
filter immediately. We aggregate the lines based on the SYSLOGBASE2
field which will contain everything up to the colon character :
. Then we simply gather all messages and finally we join the messages into a string. It goes like this:
input {
syslog {
...
}
}
filter {
aggregate {
task_id => "%{SYSLOGBASE2}"
code => "map['message'] ||= []; map['message'].push(event.get('message'));"
push_map_as_event_on_timeout => true
timeout_task_id_field => "user_id"
timeout => 1 # 1 second timeout
timeout_tags => ['_aggregatetimeout']
timeout_code => "event.set('message', map['message'].join(' '))"
}
}
output {
...
}