Giving the following logstash pipeline:
input
{
generator
{
lines => [
'{"name" : "search", "product" : { "module" : "search" , "name" : "api"}, "data" : { "query" : "toto"}}',
'{"name" : "user_interaction", "product" : { "module" : "search" , "name" : "front"}, "data" : { "query" : "toto"}}',
'{"name" : "search", "product" : { "module" : "search" , "name" : "api"}, "data" : { "query" : "toto"}}',
'{"hello": "world"}',
'{"name" :"wrong data", "data" : "I am wrong !"}',
'{"name" :"wrong data", "data" : { "hello" : "world" }}'
]
codec => json
count => 1
}
}
filter
{
mutate
{
remove_field => ["sequence", "host", "@version"]
}
}
output
{
elasticsearch
{
hosts => ["elasticsearch:9200"]
index => "events-dev6-test"
document_type => "_doc"
manage_template => false
}
stdout
{
codec => rubydebug
}
}
elasticsearch has a strict mapping for this index, hence, some events are giving 400 error "mapping set to strict, dynamic introduction of [hello] within [data] is not allowed"
(which is normal).
How to send failed events elsewhere (text logs or another elasticsearch index) (so I dont lost events) ?
Logstash 6.2 introduced Dead Letter Queues that can be used to do what you want. You'll need to enable dead_letter_queue.enable: true
in your logstash.yml
.
And then just deal with it as an input:
input {
dead_letter_queue {
path => "/path/to/data/dead_letter_queue"
commit_offsets => true
pipeline_id => "main"
}
}
output {
file {
path => ...
codec => line { format => "%{message}"}
}
}
Prior to 6.2, I don't believe there was a way to do what you want.