I'm using ELK stack. Log files are saved every 5 min by the simple Java app. Then Filebeat throws them to Logstash. Because of overwriting same messages are getting indexed (their fingerprints are identical). The only difference is the document id. Elasticsearch gives new id to documents everytime they get overwritten. How can I get rid of duplicates or keep document id the same?
Logstash input:
input {
beats {
port => 5044
ssl => false
ssl_certificate => "/etc/pki/tls/certs/logstash-beats.crt"
client_inactivity_timeout => 200
ssl_key => "/etc/pki/tls/private/logstash-beats.key"
}
}
filter {
if [fields][log_type] == "access" {
grok {
match => [ "message", "%{IP:client_ip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:apache_timestamp}\] \"%{WORD:method} /%{WORD:servername}/%{NOTSPACE:requestpage} HTTP/%{NUMBER:http_version}\" %{NUMBER:server_response} %{NUMBER:answer_size}" ]
}
}
else if [fields][log_type] == "errors" {
grok {
match => {"message" => "%{DATESTAMP:maximotime}(.*)SystemErr"}
}
date {
timezone => "Europe/Moscow"
match => ["maximotime", "dd.MM.yy HH:mm:ss:SSS"]
}
mutate {
copy => { "message" => "key" }
}
mutate {
gsub => [
"message", ".*SystemErr R ", "",
"key", ".*SystemErr R", ""
]
}
truncate {
fields => "key"
length_bytes => 255
}
fingerprint {
method => "SHA1"
source => ["key"]
}
if "_grokparsefailure" in [tags] {
drop { }
}
} else if [fields][log_type] == "info" {
grok {
match => {"message" => ["%{TIMESTAMP_ISO8601:maximotime}.* ПОЛЬЗОВАТЕЛЬ = \(%{WORD:username}.*программа \(%{WORD:appname}\).*объект \(%{WORD:object}\).*: %{GREEDYDATA:sql} \(выполнение заняло %{NUMBER:execution} миллисекунд\) \{conditions:%{GREEDYDATA:conditions}\}", "%{TIMESTAMP_ISO8601:maximotime}.* ПОЛЬЗОВАТЕЛЬ = \(%{WORD:username}.*программа \(%{WORD:appname}\).*объект \(%{WORD:object}\).*: %{GREEDYDATA:sql} \{conditions:%{GREEDYDATA:conditions}\}", "%{TIMESTAMP_ISO8601:maximotime}.* ПОЛЬЗОВАТЕЛЬ = \(%{WORD:username}.*программа \(%{WORD:appname}\).*объект \(%{WORD:object}\).*: %{GREEDYDATA:sql} \(выполнение заняло %{NUMBER:execution} миллисекунд\)"]}
add_field => {
"type" => "conditions"
}
}
mutate {
convert => {
"execution" => "integer"
}
}
fingerprint {
method => "SHA1"
source => ["message"]
}
if "_grokparsefailure" in [tags] {
grok {
match => {"message" => "%{TIMESTAMP_ISO8601:maximotime} (.*)getMboCount %{WORD:object}: mbosets \(%{WORD:mbosets}\), mbos \(%{WORD:mbos}\)"}
add_field => {
"type" => "maximoObjectCount"
}
remove_tag => ["_grokparsefailure"]
}
mutate {
convert => {
"mbosets" => "integer"
"mbos" => "integer"
}
}
fingerprint {
method => "SHA1"
source => ["message"]
}
if "_grokparsefailure" in [tags] {
drop { }
}
}
date {
timezone => "Europe/Moscow"
match => ["maximotime", "yyyy-MM-dd HH:mm:ss:SSS"]
target => "maximotime"
}
}
}
Logstash output:
output {
stdout {codec => rubydebug}
if [fields][log_type] == "access" {
elasticsearch {
hosts => ["localhost"]
manage_template => false
index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
document_type => "%{[@metadata][type]}"
}
} else if [fields][log_type] == "errors"{
elasticsearch {
hosts => ["localhost"]
manage_template => false
index => "%{[@metadata][beat]}-error-%{+YYYY.MM.dd}"
document_type => "%{[@metadata][type]}"
}
} else if [fields][log_type] == "info"{
elasticsearch {
hosts => ["localhost"]
manage_template => false
index => "%{[@metadata][beat]}-info-%{+YYYY.MM.dd}"
document_type => "%{[@metadata][type]}"
document_id => "%{fingerprint}"
}
}
}
Filebeat.yml:
filebeat.config:
modules:
path: ${path.config}/modules.d/*.yml
reload.enabled: false
processors:
- add_cloud_metadata: ~
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/integration/*.log
fields: {log_type: access}
- type: log
enabled: true
paths:
- /var/log/maximo_error_logs/*.log
fields: {log_type: errors}
exclude_lines: '^((\*+)|Log file started at:)'
multiline.pattern: '(^$|(\t|\s)at .*|.*Caused by:.*|.*SystemErr( ){5}R[ \t]{2}at .*|^ru.ocrv..*|^(\s|\t|)null.*|Обратитесь за.*|.*Закрытое со.*|^(\s|\t|)(ORA-.*|BMX.*)|^(\\s|\t)[А-Яа-я].*)|(.*\d more$)'
multiline.negate: false
multiline.match: after
- type: log
enabled: true
paths:
- /var/log/maximo_logs/*.log
fields: {log_type: info}
output.logstash:
hosts: ["elk:5044"]
bulk_max_size: 200
I'm dumb. I was restarting Filebeat container instead of ELK, so my Logstash configs wasn't applying... Now it's working and my Logstash output config looks like this:
document_id => "%{type}-%{fingerprint}"
action => "create"