Search code examples
jsonelasticsearchlambdakibanaamazon-dynamodb-streams

Real value not recognized sending JSON data from Kinesis Firehose to elasticsearch


I have an issue in Kibana with the field value explained in the following lines. I'll try to explain the situation.

I'm sending dynamoDB streams to Lambda then to Kenesis Firehouse and finally from Firehose to Elasticsearch. I'm using Kibana to visualize data and here is where I have the issue.

Lets say that I'm sending this JSON to DynamoDB:

{
    "id": "identificator",
    "timestamp": "2017-05-09T06:38:00.337Z",
    "value": 33,
    "units": "units",
    "description": "This is the description",
    "machine": {
        "brand": "brand",
        "application": "application"
    }
}

In Lambda I receive the following:

{
    "data": {
        "M": {
            "machine": {
                "M": {
                    "application": {
                        "S": "application"
                    },
                    "brand": {
                        "S": "band"
                    }
                }
            },
            "description": {
                "S": "This is the description"
            },
            "id": {
                "S": "identificator"
            },
            "units": {
                "S": "units"
            },
            "value": {
                "N": "33"
            },
            "_msgid": {
                "S": "85209b75.f51ee8"
            },
            "timestamp": {
                "S": "2017-05-09T06:38:00.337Z"
            }
        }
    },
    "id": {
        "S": "85209b75.f51ee8"
    }
}

If I forward this last JSON to Kinesis Firehose, when in Kibana I configure the index pattern, it recognizes the "timestamp" automatically (and that's great). The problem here, is that the field "value" is like a string and it is not recognized.

I tried to modify the JSON and then send it again to Firehose but then Kibana doesn't recognizes the "timestamp":

{
    "data": {
        "machine": {
            "application": "application",
            "brand": "brand"
        },
        "description": "This is the description",
        "id": "identificator",
        "units": "KWh",
        "value": 33,
        "_msgid": "85209b75.f51ee8",
        "timestamp": "2017-05-09T06:38:00.337Z"
    },
    "id": "85209b75.f51ee8"
}

I would like to know how could I send this data and Kibana recognizes the "timestamp" and "value" fields.

This is an example of the code that I'm using in lambda:

var AWS = require('aws-sdk');
var unmarshalJson = require('dynamodb-marshaler').unmarshalJson;

var firehose = new AWS.Firehose();

exports.lambda_handler = function(event, context) {

    var record = JSON.stringify(event.Records[0].dynamodb.NewImage);

    console.log("[INFO]:"+JSON.stringify(event.Records[0].dynamodb.NewImage));

    var params = {
        DeliveryStreamName: 'DeliveryStreamName',

        Record:{ 
            Data: record
        }
    };
    firehose.putRecord(params, function(err, data) {

        if (err) console.log(err, err.stack); // an error occurred
        else     console.log(JSON.stringify(data));           // successful response

        context.done();
    });
};

Solution

  • I solved it creating the index mapping by myself instead of let Kinesis Firehose create it. And declare the "timestamp" attribute as { "type" : "date" } and the "value" attibute as { "type" : "float" }

    For instance for this type of JSON:

    {
        "data": {
            "timestamp": "2017-05-09T11:30:41.484Z",
            "tag": "tag",
            "value": 33,
            "units": "units",
            "type": "type",
            "machine":{
                "name": "name",
                "type": "type",
                "company": "company"
            }
        },
        "id": "85209b75.f51ee8"
    }
    

    I created manually the following elasticsearch index and mapping:

    PUT /index
    {
        "settings" : {
            "number_of_shards" : 2
        },
        "mappings" : {
            "type" : {
                "properties" : {
                    "data" : {
                        "properties" : {
                            "machine":{
                                "properties": {
                                    "name": { "type" : "text" },
                                    "type": { "type" : "text" },
                                    "company": { "type" : "text" }
                                }
                            },
                            "timestamp": { "type" : "date" },
                            "tag" : { "type" : "text" },
                            "value": { "type" : "float" },
                            "description":  { "type" : "text" },
                            "units":  { "type" : "text" },
                            "type" : { "type" : "text" },
                            "_msgid":  { "type" : "text" }
                        }
                    },
                    "id":  { "type" : "text" }      
                }
            }
        }
    }
    

    So, to solve it, the better solution I think that in lambda you have to check if the index mapping exist and if not create it by yourself.