Search code examples
logstashlogstash-configuration

Logstash: send event elsewhere if output failed


Giving the following logstash pipeline:

input
{
    generator
    {
        lines => [
        '{"name" : "search", "product" : { "module" : "search" , "name" : "api"}, "data" : { "query" : "toto"}}',
        '{"name" : "user_interaction", "product" : { "module" : "search" , "name" : "front"}, "data" : { "query" : "toto"}}',
        '{"name" : "search", "product" : { "module" : "search" , "name" : "api"}, "data" : { "query" : "toto"}}',
        '{"hello": "world"}',
        '{"name" :"wrong data", "data" : "I am wrong !"}',
        '{"name" :"wrong data", "data" : { "hello" : "world" }}'
        ]
        codec => json
        count => 1
    }
}

filter
{
  mutate
  {
    remove_field => ["sequence", "host", "@version"]
  }
}

output
{
   elasticsearch
   {
     hosts => ["elasticsearch:9200"]
     index => "events-dev6-test"
     document_type => "_doc"
     manage_template => false
   }

   stdout
   {
       codec => rubydebug
   }
}

elasticsearch has a strict mapping for this index, hence, some events are giving 400 error "mapping set to strict, dynamic introduction of [hello] within [data] is not allowed" (which is normal).

How to send failed events elsewhere (text logs or another elasticsearch index) (so I dont lost events) ?


Solution

  • Logstash 6.2 introduced Dead Letter Queues that can be used to do what you want. You'll need to enable dead_letter_queue.enable: true in your logstash.yml.

    And then just deal with it as an input:

    input {
      dead_letter_queue {
        path => "/path/to/data/dead_letter_queue" 
        commit_offsets => true 
        pipeline_id => "main" 
      }
    }
    
    output {
      file {
        path => ...
           codec => line { format => "%{message}"}
       }    
    }
    

    Prior to 6.2, I don't believe there was a way to do what you want.