Search code examples
elasticsearchapache-kafkalogstash

How to reuse added field in output with logstash


My use case is simple. I have a Kafka in input, and some indexes in Elasticsearch (topic name === index name), and where indexes names are the same as the entities we use in our application, like "buildings", "cars", "bus" (just for example).

input {
    kafka {
        bootstrap_servers => "kafka:29092"
        topics => ['cars', 'bus']
        decorate_events => true
        codec => 'json'
    }
}

filter {
    if [@metadata][kafka][topic] == 'cars' {
        mutate {
            convert => {
                "car_id" => "integer"
            }
            add_field => {
                'id' => "%{car_id}"
            }
        }
    }

    if [@metadata][kafka][topic] == 'bus' {
        mutate {
            convert => {
                "bus_id" => "integer"
            }
            add_field => {
                'id' => "%{bus_id}"
            }
        }
    }
}

if [@metadata][kafka][topic] == 'cars' {
    elasticsearch {
        hosts => "elasticsearch:9200"
        user => "${ELASTICSEARCH_USERNAME}"
        password => "${ELASTICSEARCH_PASSWORD}"
        index => "%{[@metadata][kafka][topic]}"
        doc_as_upsert => true
        action => 'update'
        document_id => '%{car_id}'
    }

    if '%{[isDelete]}' {
        elasticsearch {
            hosts => "elasticsearch:9200"
            user => "${ELASTICSEARCH_USERNAME}"
            password => "${ELASTICSEARCH_PASSWORD}"
            index => "%{[@metadata][kafka][topic]}"
            action => 'delete'
            document_id => '%{car_id}'
        }
    }
}

if [@metadata][kafka][topic] == 'cars' {
    elasticsearch {
        hosts => "elasticsearch:9200"
        user => "${ELASTICSEARCH_USERNAME}"
        password => "${ELASTICSEARCH_PASSWORD}"
        index => "%{[@metadata][kafka][topic]}"
        doc_as_upsert => true
        action => 'update'
        document_id => '%{bus_id}'
    }

    if '%{[isDelete]}' {
        elasticsearch {
            hosts => "elasticsearch:9200"
            user => "${ELASTICSEARCH_USERNAME}"
            password => "${ELASTICSEARCH_PASSWORD}"
            index => "%{[@metadata][kafka][topic]}"
            action => 'delete'
            document_id => '%{bus_id}'
        }
    }
}

To update / delete documents from Logstash, I need to use their id. But you'll understand, I don't want to have 50 conditions for each entities, i prefer factorize.

I would like to re-use the "id" I've created in the filter part, in the output to use it in document_id.

Do you have any idea about how I could do it ?


Solution

  • You can do it like this:

    input {
        kafka {
            bootstrap_servers => "kafka:29092"
            topics => ['cars', 'bus']
            decorate_events => true
            codec => 'json'
        }
    }
    
    filter {
        translate {
           source => "[@metadata][topic]"
           target => "[@metadata][id_field]"
           dictionary => {
              "cars" => "car_id"
              "bus" => "bus_id"
           }
           fallback => "no_id"
        }
        ruby {
            code => "event.set('id', event.get(event.get('[@metadata][id_field]')))"
        }
    }
    
    output {
        if '%{[isDelete]}' in [message] {
            elasticsearch {
                hosts => "elasticsearch:9200"
                user => "${ELASTICSEARCH_USERNAME}"
                password => "${ELASTICSEARCH_PASSWORD}"
                index => "%{[@metadata][kafka][topic]}"
                action => 'delete'
                document_id => '%{id}'
            }
        } else {
            elasticsearch {
                hosts => "elasticsearch:9200"
                user => "${ELASTICSEARCH_USERNAME}"
                password => "${ELASTICSEARCH_PASSWORD}"
                index => "%{[@metadata][kafka][topic]}"
                doc_as_upsert => true
                action => 'update'
                document_id => '%{id}'
            }
        }
    }