Search code examples
elasticsearchlogstashcsv-import

Logstash csv import - mutate add_field if not empty


I'm using logstash to import data from csv files into our elasticsearch.

During the import I want to create a new field that has values from two other fields. Here's a snippet of my import:

    input {
          file {
            path => "/data/xyz/*.csv"
            start_position => "beginning"
            ignore_older => 0
            sincedb_path => "/dev/null"
          }
    }

    filter {
        if [path] =~ "csv1" {
            csv {
                  separator => ";"
                  columns =>
                  [
                    "name1",
                    "name2",
                    "name3",
                    "ID"              
                  ]
              }
                mutate {
                  add_field => {
                      "searchfield" => "%{name1} %{name2} %{name3}"
                }
            }
        }

    output {
           if [path] =~ "csv1" {
               elasticsearch {
                  hosts => "localhost"
                  index => "my_index"           
                  document_id => "%{ID}"
               }
           }
    }
}

This works as desired but on rows where for example name3 is empty, logstash writes %{name3} into the new field. Is there a way to only add the value if it's not empty?


Solution

  • I think there's no other way other than checking if name3 is present and based on that, build your search field.

    if [name3] {
      mutate {
        id => "with-name3"
        add_field => { "searchfield" => "%{name1} %{name2} %{name3}" }
      }
    } else {
      mutate {
        id => "without-name3"
        add_field => { "searchfield" => "%{name1} %{name2}" }
      }
    }
    

    Alternatively, if I understand your issue right, you obviously want to ship this data to Elasticsearch and want to have a single searchable field. In order to avoid data duplication in your source, you can build a search field by using copy_to statement. Your mappings would look as follows:

    {
      "mappings": {
        "doc": {
          "properties": {
            "name1": {
              "type": "text",
              "copy_to": "searchfield" 
            },
            "name2": {
              "type": "text",
              "copy_to": "searchfield" 
            },
            "name3": {
              "type": "text",
              "copy_to": "searchfield" 
            },
            "searchfield": {
              "type": "text"
            }
          }
        }
      }
    }
    

    and then you can perfectly run your queries against that field without having duplicates in source.

    Update. Basically your logstash.conf would look as follows:

    input {
      file {
        path => "/data/xyz/*.csv"
        start_position => "beginning"
        ignore_older => 0
        sincedb_path => "/dev/null"
      }
    }
    
    filter {
      if [path] =~ "csv1" {
        csv {
          separator => ";"
          columns => ["name1", "name2", "name3", "ID"]
        }
      }
    }
    
    output {
      if [path] =~ "csv1" {
        elasticsearch {
          hosts => "localhost"
          index => "my_index"
          document_id => "%{ID}"
        }
      }
    }
    

    Then create elasticsearch index using the following:

    PUT /my_index/
    {
      "mappings": {
        "doc": {
          "properties": {
            "name1": {
              "type": "text",
              "copy_to": "searchfield" 
            },
            "name2": {
              "type": "text",
              "copy_to": "searchfield" 
            },
            "name3": {
              "type": "text",
              "copy_to": "searchfield" 
            },
            "searchfield": {
              "type": "text"
            }
          }
        }
      }
    }
    

    And then you can run search as follows:

    GET /my_index/_search
    {
      "query": {
        "match": {
          "searchfield": {
            "query": "your text"
          }
        }
      }
    }