Search code examples
elasticsearchlogstashfilebeatelk

Elasticsearch: Duplicates caused by overwriting log files


I'm using ELK stack. Log files are saved every 5 min by the simple Java app. Then Filebeat throws them to Logstash. Because of overwriting same messages are getting indexed (their fingerprints are identical). The only difference is the document id. Elasticsearch gives new id to documents everytime they get overwritten. How can I get rid of duplicates or keep document id the same?

Logstash input:

    input {
  beats {
    port => 5044
    ssl => false
    ssl_certificate => "/etc/pki/tls/certs/logstash-beats.crt"
    client_inactivity_timeout => 200
    ssl_key => "/etc/pki/tls/private/logstash-beats.key"
  }
}
filter {
        if [fields][log_type] == "access" {
        grok {
        match => [ "message", "%{IP:client_ip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:apache_timestamp}\] \"%{WORD:method} /%{WORD:servername}/%{NOTSPACE:requestpage} HTTP/%{NUMBER:http_version}\" %{NUMBER:server_response} %{NUMBER:answer_size}" ]
    }
    }
    else  if [fields][log_type] == "errors" {
      grok {
        match => {"message" => "%{DATESTAMP:maximotime}(.*)SystemErr"}
          }
          date {
        timezone => "Europe/Moscow"
        match => ["maximotime", "dd.MM.yy HH:mm:ss:SSS"]
       }
      mutate {
         copy => { "message" => "key" }
      }
      mutate {
        gsub => [
          "message", ".*SystemErr     R ", "",
          "key", ".*SystemErr     R", ""
        ]
      }
      truncate {
        fields => "key"
        length_bytes => 255
      }
      fingerprint {
        method => "SHA1"
        source => ["key"]
      }
      if "_grokparsefailure" in [tags] {
        drop { }
      }
    } else  if [fields][log_type] == "info" {
    grok {
      match => {"message" => ["%{TIMESTAMP_ISO8601:maximotime}.* ПОЛЬЗОВАТЕЛЬ = \(%{WORD:username}.*программа \(%{WORD:appname}\).*объект \(%{WORD:object}\).*: %{GREEDYDATA:sql}  \(выполнение заняло %{NUMBER:execution} миллисекунд\) \{conditions:%{GREEDYDATA:conditions}\}",  "%{TIMESTAMP_ISO8601:maximotime}.* ПОЛЬЗОВАТЕЛЬ = \(%{WORD:username}.*программа \(%{WORD:appname}\).*объект \(%{WORD:object}\).*: %{GREEDYDATA:sql} \{conditions:%{GREEDYDATA:conditions}\}", "%{TIMESTAMP_ISO8601:maximotime}.* ПОЛЬЗОВАТЕЛЬ = \(%{WORD:username}.*программа \(%{WORD:appname}\).*объект \(%{WORD:object}\).*: %{GREEDYDATA:sql}  \(выполнение заняло %{NUMBER:execution} миллисекунд\)"]}
      add_field => {
        "type" => "conditions"
      }
    }
      mutate {
       convert => {
        "execution" => "integer"
       }
      }
      fingerprint {
        method => "SHA1"
        source => ["message"]
      }
    if "_grokparsefailure" in [tags] {
        grok {
        match => {"message" => "%{TIMESTAMP_ISO8601:maximotime} (.*)getMboCount %{WORD:object}: mbosets \(%{WORD:mbosets}\), mbos \(%{WORD:mbos}\)"}
        add_field => {
        "type" => "maximoObjectCount"
        }
        remove_tag => ["_grokparsefailure"]
        }
        mutate {
        convert => {
         "mbosets" => "integer"
         "mbos" => "integer"
        }
        }
        fingerprint {
          method => "SHA1"
          source => ["message"]
        }
        if "_grokparsefailure" in [tags] {
          drop { }
        }
    }

    date {
        timezone => "Europe/Moscow"
        match => ["maximotime", "yyyy-MM-dd HH:mm:ss:SSS"]
        target => "maximotime"
       }
    }
}

Logstash output:

output {
    stdout {codec => rubydebug}
    if [fields][log_type] == "access" {
        elasticsearch {  
            hosts => ["localhost"]
            manage_template => false
            index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
            document_type => "%{[@metadata][type]}"
        }
    } else if [fields][log_type] == "errors"{
        elasticsearch {  
            hosts => ["localhost"]
            manage_template => false
            index => "%{[@metadata][beat]}-error-%{+YYYY.MM.dd}"
            document_type => "%{[@metadata][type]}"
        }
    } else if [fields][log_type] == "info"{
        elasticsearch {  
            hosts => ["localhost"]
            manage_template => false
            index => "%{[@metadata][beat]}-info-%{+YYYY.MM.dd}"
            document_type => "%{[@metadata][type]}"
            document_id => "%{fingerprint}"
        }
    }
}

Filebeat.yml:

filebeat.config:
  modules:
    path: ${path.config}/modules.d/*.yml
    reload.enabled: false

processors:
- add_cloud_metadata: ~

filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/integration/*.log
  fields:  {log_type: access}
- type: log
  enabled: true
  paths:
    - /var/log/maximo_error_logs/*.log
  fields:  {log_type: errors}
  exclude_lines: '^((\*+)|Log file started at:)'
  multiline.pattern: '(^$|(\t|\s)at .*|.*Caused by:.*|.*SystemErr( ){5}R[ \t]{2}at .*|^ru.ocrv..*|^(\s|\t|)null.*|Обратитесь за.*|.*Закрытое со.*|^(\s|\t|)(ORA-.*|BMX.*)|^(\\s|\t)[А-Яа-я].*)|(.*\d more$)'
  multiline.negate: false
  multiline.match: after
- type: log
  enabled: true
  paths:
    - /var/log/maximo_logs/*.log
  fields:  {log_type: info}

output.logstash:
  hosts: ["elk:5044"]
  bulk_max_size: 200

Solution

  • I'm dumb. I was restarting Filebeat container instead of ELK, so my Logstash configs wasn't applying... Now it's working and my Logstash output config looks like this:

    document_id => "%{type}-%{fingerprint}"
    action => "create"