I am trying to enable logstash dead letter queue but, I am getting this warning message and elasticsearch is not receiving the logs from logstash.
[WARN ][org.logstash.common.io.DeadLetterQueueWriter][main] Event previously submitted to dead letter queue. Skipping...
The scenario I am trying,
Limiting elasticsearch shards so in this case, it can't create a new shard and log will be in the dead letter queue
curl -X PUT 'http://elasticsearch:9200/_cluster/settings' -H "Content-Type: application/json" -d '{ "persistent": { "cluster.max_shards_per_node": "3" } }'
end sending log to the logstash from rabbitmq:
{index:"12345",id:1,message:"hello world"}
Increasing the shard limit:
curl -X PUT 'http://elasticsearch:9200/_cluster/settings' -H "Content-Type: application/json" -d '{ "persistent": { "cluster.max_shards_per_node": "10" } }'
when I check the the dead letter queue from logstash api I can see the log in the queue but it's not sending the log to the elasticsearch.
logstash.yml:
pipeline:
batch:
size: 125
delay: 5
dead_letter_queue.enable: true
logstash.conf
input {
udp {
port => "${INPUT_UDP_PORT}"
type => syslog
codec => json
}
tcp {
port => "${INPUT_TCP_PORT}"
type => syslog
codec => json_lines
}
http {
port => "${INPUT_HTTP_PORT}"
codec => "json"
}
rabbitmq {
host => "${RABBIT_MQ_HOST}"
queue => "log"
durable => true
exchange => "log"
key => "log"
threads => 3
prefetch_count => 50
port => 5672
user => "${RABBIT_MQ_USERNAME}"
password => "${RABBIT_MQ_PASSWORD}"
type => "log"
}
dead_letter_queue {
path => "/usr/share/logstash/data/dead_letter_queue"
commit_offsets => false
pipeline_id => "main"
}
}
filter {
if [logger_name] =~ "metrics" {
kv {
source => "message"
field_split_pattern => ", "
prefix => "metric_"
}
mutate {
convert => { "metric_value" => "float" }
convert => { "metric_count" => "integer" }
convert => { "metric_min" => "float" }
convert => { "metric_max" => "float" }
convert => { "metric_mean" => "float" }
convert => { "metric_stddev" => "float" }
convert => { "metric_median" => "float" }
convert => { "metric_p75" => "float" }
convert => { "metric_p95" => "float" }
convert => { "metric_p98" => "float" }
convert => { "metric_p99" => "float" }
convert => { "metric_p999" => "float" }
convert => { "metric_mean_rate" => "float" }
convert => { "metric_m1" => "float" }
convert => { "metric_m5" => "float" }
convert => { "metric_m15" => "float" }
# No need to keep message field after it has been parsed
remove_field => ["message"]
}
}
if [type] == "syslog" {
mutate {
add_field => { "instance_name" => "%{app_name}-%{host}:%{app_port}" }
}
}
mutate {
# workaround from https://github.com/elastic/logstash/issues/5115
add_field => { "[@metadata][LOGSTASH_DEBUG]" => "${LOGSTASH_DEBUG:false}" }
}
mutate {
remove_field => "[geoip][location]"
}
}
output {
if [logger_name] =~ "metrics" {
elasticsearch {
hosts => ["${ELASTICSEARCH_HOST}:${ELASTICSEARCH_PORT}"]
index => "metrics-%{+YYYY.MM.dd}"
}
} else {
elasticsearch {
hosts => ["${ELASTICSEARCH_HOST}:${ELASTICSEARCH_PORT}"]
index => "logs-%{+YYYY.MM.dd}"
}
}
if [@metadata][LOGSTASH_DEBUG] == "true" {
stdout {
codec => rubydebug
}
}
}
elasticsearch and logstash versions are: 7.9.2
"Event previously submitted to dead letter queue. Skipping..." indicates that an event from the DLQ input (specifically an event that includes the DLQ metadata) has been sent to the elasticsearch output and the output has tried to write it to the DLQ again. It skips it because it makes no sense to endlessly loop retrying the same rejected event.
The DLQ documentation says "The dead letter queue is used for documents with response codes of 400 or 404, both of which indicate an event that cannot be retried.". You have to modify the event based on the reason documented in the DLQ metadata before trying to send it to elasticsearch again.