Search code examples
logstashkibanalogstash-jdbc

logstash: Import comma separated string from MySQL into elastic search as an array


I'm trying to insert a comma separated string (GROUP_CONCAT) into elasticsearch as an array datatype. As input I use JDBC and the output of the SQL query is like:

+---------+-----------+------------+--------------------------+-------------+---------------------+---------+------------+----------+---------------------+-------------+---------+----------------------------------------+
| network | post_dbid | host_dbid  | host_netid               | post_netid  | published           | n_likes | n_comments | language | indexed             | n_harvested | country | vrt                                    |
+---------+-----------+------------+--------------------------+-------------+---------------------+---------+------------+----------+---------------------+-------------+---------+----------------------------------------+
| xxx     | 2_xxx     | 60480_xxx  | xxxxxxxxxxxxxxxxxxxxxxxx | xxxxxxxxxxx | 2017-12-28 08:11:58 | 5       | 0          | en       | 2018-05-30 00:00:00 | 0           | ID      | Fitness,Well-being                     |
| xxx     | 5_xxx     | 98458_xxx  | xxxxxxxxxxxxxxxxxxxxxxxx | xxxxxxxxxxx | 2016-09-01 11:59:14 | 2275    | 242        | ar       | 2018-05-30 00:00:00 | 0           | SA      | SmartPhones_Gadgets                    |
| xxx     | 15_xxx    | 50884_xxx  | xxxxxxxxxxxxxxxxxxxxxxxx | xxxxxxxxxxx | 2018-04-23 16:36:10 | 0       | 0          | en       | 2018-05-30 00:00:00 | 0           | EG      | Fashion_Beauty                         |
| xxx     | 21_xxx    | 64118_xxx  | xxxxxxxxxxxxxxxxxxxxxxxx | xxxxxxxxxxx | 2015-07-01 22:50:54 | 295     | 8          | pt       | 2018-05-30 00:00:00 | 0           | BR      | Nutrition                              |
| xxx     | 24_xxx    | 9767_xxx   | xxxxxxxxxxxxxxxxxxxxxxxx | xxxxxxxxxxx | 2017-05-30 02:35:29 | 10      | 1          | en       | 2018-06-18 15:32:57 | 0           | US      | Health                                 |
| xxx     | 87_xxx    | 44473_xxx  | xxxxxxxxxxxxxxxxxxxxxxxx | xxxxxxxxxxx | 2017-01-08 23:02:52 | 7       | 0          | en       | 2018-05-30 00:00:00 | 0           | US      | Beverages                              |
| xxx     | 99_xxx    | 120198_xxx | xxxxxxxxxxxxxxxxxxxxxxxx | xxxxxxxxxxx | 2018-02-17 02:57:58 | 8       | 0          | en       | 2018-05-30 00:00:00 | 0           | US      | Food                                   |
| xxx     | 126_xxx   | 50258_xxx  | xxxxxxxxxxxxxxxxxxxxxxxx | xxxxxxxxxxx | 2018-03-22 09:16:25 | 1       | 0          | en       | 2018-05-30 00:00:00 | 0           | IN      | Health                                 |
+---------+-----------+------------+--------------------------+-------------+---------------------+---------+------------+----------+---------------------+-------------+---------+----------------------------------------+

I used split from mutate plugin:

filter {
  mutate {
     split => { "vrt" => "," }
  }
}

although, field was inserted as a comma separated string:

GET xxx/_search
{
  "query": {
    "terms": {
      "_id": ["2_xxx"] 
    }
  }
}

responce:

{
  "took": 2,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 1,
    "max_score": 1,
    "hits": [
      {
        "_index": "xxx",
        "_type": "doc",
        "_id": "2_xxx",
        "_score": 1,
        "_source": {
          "post_dbid": "2_xxx",
          "host_dbid": "60480_xxx",
          "host_netid": "xxxxxxxxxxxxxxxxxxxxxxxx",
          "n_likes": 5,
          "n_comments": 0,
          "country": "ID",
          "network": "xxx",
          "indexed": "2018-05-30T00:00:00.000Z",
          "n_harvested": 0,
          "vrt": "Fitness,Well-being",
          "@version": "1",
          "post_netid": "xxxxxxxxxxx",
          "@timestamp": "2018-06-27T15:47:24.370Z",
          "language": "en",
          "published": "2017-12-28T08:11:58.000Z"
        }
      }
    ]
  }
}

My final goal is to insert vrt as array field and using kibana, to create visualizations. For example I want to create a counter on kibana and count how many document has the "Fitness" on vrt field.

ELK Version: 6.2.4


Solution

  • You could use ruby filter. Here's how I do it. I created a ruby method that splits comma separated string, trims, rejects empty elements and removes duplicates. You can then use the method for all comma separated string as below:

     filter {
     ruby{
         code =>"
    
             # method to split the supplied string by comma, trim whitespace and return an array
             def mapStringToArray(strFieldValue)
    
                #if string is not null, return array
                if (strFieldValue != nil)
                    fieldArr =  strFieldValue.split(',').map(&:strip).reject(&:empty?).uniq 
                    return fieldArr                             
                end     
    
                return [] #return empty array if string is nil
             end
    
             vrtArr = mapStringToArray(event.get('vrt'))
             if vrtArr.length > 0                           
                event.set('vrt', vrtArr) 
             end
    "
    }
    }