Search code examples
elasticsearchlogstashlogstash-jdbc

Logstash error when converting MySQL value to nested elasticsearch property on suggestion field


A Huge cry for help here, When i try to convert a MySQL value to a nested elasticsearch field using logstash i get the following error.

{"exception"=>"expecting List or Map, found class org.logstash.bivalues.StringBiValue", "backtrace"=>["org.logstash.Accessors.newCollectionException(Accessors.java:195)"

Using the following config file:

input {
    jdbc {
        jdbc_driver_library => "/logstash/mysql-connector-java-5.1.42-bin.jar"
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        jdbc_connection_string => "jdbc:mysql://localhost:3306/data"
        jdbc_user => "username"
        jdbc_password => "password"
        statement => "SELECT id, suggestions, address_count FROM `suggestions` WHERE id <= 100"
        jdbc_paging_enabled => "true"
        jdbc_page_size => "50000"
    }
}
filter {
  mutate {
  rename => { 'address_count' => '[suggestions][payload][count]' }
  }
}
output {
    elasticsearch {
    hosts => [
        "localhost:9200"
    ]
        index => "dev_suggestions"
        document_type => "address"
    }
}

However if i rename address_count to a field that is not already in my mapping, Then it works just fine and it correctly adds the value as a nested property, I have tried on other fields in my index and not just suggestions.payloads.address_count and i get the same issue, It only works if the field has not been defined in the mapping.

This has caused me some serious headaches and if anyone could help me out to overcome this issue i would greatly appreciate it as Ive spent the last 48 hours banging my head on the table!

I initially assumed i could do the following with a MySQL query:

SELECT id, suggestion, '[suggestions][payload][count]' FROM `suggestions` WHERE id <= 100

Then i also tried

SELECT id, suggestion, 'suggestions.payload.count' FROM `suggestions` WHERE id <= 100

Both failed to insert the value with the later option giving an error that a field can not contain dots.

And finally the mapping:

{
  "mappings": {
    "address": {
      "properties": {
        "suggestions": {
          "type": "completion",
          "payloads" : true
        }
      }
    }
  }
}

Thanks to Val - and for future users in the same situation as myself that need to convert MySQL data into nested Elasticsearch objects using logstash, Here is a working solution using Logstash 5 and Elasticsearch 2.*

input {
    jdbc {
        jdbc_driver_library => "/logstash/mysql-connector-java-5.1.42-bin.jar"
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        jdbc_connection_string => "jdbc:mysql://localhost:3306/data"
        jdbc_user => "username"
        jdbc_password => "password"
        statement => "SELECT addrid, suggestion, address_count FROM `suggestions` WHERE id <= 20"
        jdbc_paging_enabled => "true"
        jdbc_page_size => "50000"
    }
}

filter {
  ruby {
       code => "
           event.set('[suggestions][input]', event.get('suggestion'))
           event.set('[suggestions][payload][address_count]', event.get('address_count'))
           event.set('[v][payload][id]', event.get('addrid'))
       "
       remove_field => [ 'suggestion', 'address_count', 'addrid' ]
  }
}

output {
    elasticsearch {
        hosts => [
            "localhost:9200"
        ]
        index => "dev_suggestions"
        document_type => "address"
    }
}

Solution

  • I think you need to proceed differently. First, I would rename the suggestions field in your SQL query to something else and then build the suggestions object from the values you get from your SQL query.

        statement => "SELECT id, suggestion, address_count FROM `suggestions` WHERE id <= 100"
    

    Then you could use a ruby filter (and remove your mutate one) in order to build your suggestions field, like this:

    Logstash 2.x code:

    ruby {
         code => "
             event['suggestions']['input'] = event['suggestion']
             event['suggestions']['payload']['count'] = event['address_count']
         "
         remove_field => [ 'suggestion', 'address_count' ]
    }
    

    Logstash 5.x code:

    ruby {
         code => "
             event.set('[suggestions][input]', event.get('suggestion'))
             event.set('[suggestions][payload][count]', event.get('address_count'))
         "
         remove_field => [ 'suggestion', 'address_count' ]
    }
    

    PS: All this assumes you're using ES 2.x since the payload field has disappeared in ES 5.x