Search code examples
elasticsearchlogstash-configuration

logstash 5.0.1: setup elasticsearch multiple indexes ouput for multiple kafka input topics


I have a logstash input setup as

input {
  kafka {
  bootstrap_servers => "zookeper_address"
  topics => ["topic1","topic2"]
  }
}

I need to feed the topics into two different indexes in elasticsearch. Can anyone help me with how the ouput should be setup for such a task. At this time I am only able to setup

output {
  elasticsearch {
    hosts => ["localhost:9200"]
    index => "my_index"
    codec => "json"
    document_id => "%{id}"
  }
}

I need two indexes on the same elasticsearch instance say index1 and index2 which will be fed by messages coming in on topic1 and topic2


Solution

  • First, you need to add decorate_events to your kafka input in order to know from which topic the message is coming

    input {
      kafka {
        bootstrap_servers => "zookeper_address"
        topics => ["topic1","topic2"]
        decorate_events => true
      }
    }
    

    Then, you have two options, both involving conditional logic. The first is by introducing a filter for adding the correct index name depending on the topic name. For this you need to add

    filter {
       if [kafka][topic] == "topic1" {
          mutate {
             add_field => {"[@metadata][index]" => "index1"}
          }
       } else {
          mutate {
             add_field => {"[@metadata][index]" => "index2"}
          }
       }
       # remove the field containing the decorations, unless you want them to land into ES
       mutate {
          remove_field => ["kafka"]
       }
    }
    output {
      elasticsearch {
        hosts => ["localhost:9200"]
        index => "%{[@metadata][index]}"
        codec => "json"
        document_id => "%{id}"
      }
    }
    

    Then second option is to do the if/else directly in the output section, like this (but the additional kafka field will land into ES):

    output {
       if [@metadata][kafka][topic] == "topic1" {
         elasticsearch {
           hosts => ["localhost:9200"]
           index => "index1"
           codec => "json"
           document_id => "%{id}"
         }
       } else {
         elasticsearch {
           hosts => ["localhost:9200"]
           index => "index2"
           codec => "json"
           document_id => "%{id}"
         }
       }
    }