Search code examples
elasticsearchlogstashkibanaelastic-stackelk

Logstash Dead Letter Queue Event previously submitted to dead letter queue. Skipping


I am trying to enable logstash dead letter queue but, I am getting this warning message and elasticsearch is not receiving the logs from logstash.

[WARN ][org.logstash.common.io.DeadLetterQueueWriter][main] Event previously submitted to dead letter queue. Skipping...

The scenario I am trying,

Limiting elasticsearch shards so in this case, it can't create a new shard and log will be in the dead letter queue

curl -X PUT 'http://elasticsearch:9200/_cluster/settings' -H "Content-Type: application/json" -d '{ "persistent": { "cluster.max_shards_per_node": "3" } }'

end sending log to the logstash from rabbitmq:

{index:"12345",id:1,message:"hello world"}

Increasing the shard limit:

curl -X PUT 'http://elasticsearch:9200/_cluster/settings' -H "Content-Type: application/json" -d '{ "persistent": { "cluster.max_shards_per_node": "10" } }'

when I check the the dead letter queue from logstash api I can see the log in the queue but it's not sending the log to the elasticsearch.

logstash.yml:

pipeline:
  batch:
    size: 125
    delay: 5
dead_letter_queue.enable: true

logstash.conf

input {
    udp {
        port => "${INPUT_UDP_PORT}"
        type => syslog
        codec => json
    }
    tcp {
        port => "${INPUT_TCP_PORT}"
        type => syslog
        codec => json_lines
    }
    http {
        port => "${INPUT_HTTP_PORT}"
        codec => "json"
    }
    rabbitmq {
        host => "${RABBIT_MQ_HOST}"
        queue => "log"
        durable => true
        exchange => "log"
        key => "log"
        threads => 3
        prefetch_count => 50
        port => 5672
        user => "${RABBIT_MQ_USERNAME}"
        password => "${RABBIT_MQ_PASSWORD}"
        type => "log"
    }
    dead_letter_queue {
        path => "/usr/share/logstash/data/dead_letter_queue"
        commit_offsets => false
        pipeline_id => "main"
    }
}

filter {
    if [logger_name] =~ "metrics" {
        kv {
            source => "message"
            field_split_pattern => ", "
            prefix => "metric_"
        }
        mutate {
            convert => { "metric_value" => "float" }
            convert => { "metric_count" => "integer" }
            convert => { "metric_min" => "float" }
            convert => { "metric_max" => "float" }
            convert => { "metric_mean" => "float" }
            convert => { "metric_stddev" => "float" }
            convert => { "metric_median" => "float" }
            convert => { "metric_p75" => "float" }
            convert => { "metric_p95" => "float" }
            convert => { "metric_p98" => "float" }
            convert => { "metric_p99" => "float" }
            convert => { "metric_p999" => "float" }
            convert => { "metric_mean_rate" => "float" }
            convert => { "metric_m1" => "float" }
            convert => { "metric_m5" => "float" }
            convert => { "metric_m15" => "float" }
            # No need to keep message field after it has been parsed
            remove_field => ["message"]
        }
    }
    if [type] == "syslog" {
        mutate {
            add_field => { "instance_name" => "%{app_name}-%{host}:%{app_port}" }
        }
    }
    mutate {
        # workaround from https://github.com/elastic/logstash/issues/5115
        add_field => { "[@metadata][LOGSTASH_DEBUG]" => "${LOGSTASH_DEBUG:false}" }
    }
    mutate {
      remove_field => "[geoip][location]"
    }
}

output {
    if [logger_name] =~ "metrics" {
        elasticsearch {
            hosts => ["${ELASTICSEARCH_HOST}:${ELASTICSEARCH_PORT}"]
            index => "metrics-%{+YYYY.MM.dd}"
        }
    } else {
        elasticsearch {
            hosts => ["${ELASTICSEARCH_HOST}:${ELASTICSEARCH_PORT}"]
            index => "logs-%{+YYYY.MM.dd}"
        }
    }
    if [@metadata][LOGSTASH_DEBUG] == "true" {
        stdout {
            codec => rubydebug
        }
    }

}

elasticsearch and logstash versions are: 7.9.2


Solution

  • "Event previously submitted to dead letter queue. Skipping..." indicates that an event from the DLQ input (specifically an event that includes the DLQ metadata) has been sent to the elasticsearch output and the output has tried to write it to the DLQ again. It skips it because it makes no sense to endlessly loop retrying the same rejected event.

    The DLQ documentation says "The dead letter queue is used for documents with response codes of 400 or 404, both of which indicate an event that cannot be retried.". You have to modify the event based on the reason documented in the DLQ metadata before trying to send it to elasticsearch again.