Search code examples
node.jsaws-lambdaamazon-kinesisamazon-timestream

Lambda function triggered by kinesis stream ingest data to Timestream


Here's my lambda function:

const AWS = require('aws-sdk');
//const dynamo = new AWS.DynamoDB.DocumentClient();
//const events ="events"

exports.handler = function(event, context) {

    event.Records.forEach(function(record) {
        
        console.log('Data payload: ', record.kinesis.data);
        
        //Kinesis data is base64 encoded so decode here
        var payload = Buffer.from(record.kinesis.data, 'base64').toString('utf8');
        
        console.log('Decoded payload:', payload);
        
        var data = JSON.parse(payload);
        
        console.log("Data: %j", data);
        
        console.log("Data value: %s", data.value);
        
        var clean = JSON.stringify(data.value).replace(/[`~!@#$%^&*()_|+\-=?;'",.<>\{\}\[\]\\\/]/gi, '')
        
        console.log("Data clean: %s", clean);
        
        var values = clean.split(":");
        var unit = values[0];
        var metric = values[1];
        
        console.log("Data unit: %s", unit);
        console.log("Data metric: %s", metric);
        
        console.log("Writing records");
        const currentTime = Date.now().toString(); // Unix time in milliseconds
       
        const item = [
            "deviceId": context.awsRequestId,
            "timestamp": data.datetime,
            "time": currentTime.toString(),
            "type": data.topic,
            "metric": metric,
            "forge": "uuid",
            "unit": unit
        };
  
        var params = {
        TableName: events,
        Item:{
            "deviceId": context.awsRequestId,
            "timestamp": data.datetime,
            "type": data.topic,
            "metric": metric,
            "forge": "uuid",
            "unit": unit
            }
        };
        
        
        // Instead of saving data to dynamo. I would like to store it on timestream.
        /*console.log("Saving Telemetry Data");
        
        dynamo.put(params, function(err, data) {
            if (err) {
                console.error("Unable to add event. Error JSON:", JSON.stringify(err, null, 2));
                context.fail();
            } else {
                console.log(data);
                console.log("Data saved:", JSON.stringify(params, null, 2));
                context.succeed();
                return {"message": "Item created in DB"};
            }
        });
        */
    });
};

How can I replicate the dynamo flow but with the timestream SDK or is there a better/easier way?


Solution

  • const AWS = require('aws-sdk');
    
    const timestreamwrite = new AWS.TimestreamWrite();
    
    exports.handler = function(event, context) {
    
        event.Records.forEach(function(record) {
            
            console.log('Data payload: ', record.kinesis.data);
            
            //Kinesis data is base64 encoded so decode here
            var payload = Buffer.from(record.kinesis.data, 'base64').toString('utf8');
            
            console.log('Decoded payload:', payload);
            
            var data = JSON.parse(payload);
            
            console.log("Data: %j", data);
            
            
            var clean = JSON.stringify(data.value).replace(/[`~!@#$%^&*()_|+\-=?;'",.<>\{\}\[\]\\\/]/gi, '')
            
            var values = clean.split(":");
            var unit = values[0];
            var metric = values[1];
            
            const currentTime = Date.now().toString(); // Unix time in milliseconds
           
            const records = [];
           
            records.push({
               Dimensions: [{
                   Name: 'sensor',
                   Value: data.topic,
                   DimensionValueType: 'VARCHAR'
               }],
               MeasureName: unit,
               MeasureValue: metric,
               MeasureValueType: 'DOUBLE',
               Time: currentTime.toString()
           });
           
           const params = {
            DatabaseName: "greenforge-measurement",
            TableName: "sensor-events",
            Records: records
           };
           
           console.log("Saving Telemetry Data");
           
           timestreamwrite.writeRecords(params, function(err, data) {
                if (err) {
                    console.error("Unable to add event. Error JSON:", JSON.stringify(err, null, 2));
                    context.fail();
                } else {
                    console.log(data);
                    console.log("Data saved:", JSON.stringify(params, null, 2));
                    context.succeed();
                    return {"message": "Item created in Timestream"};
                }
            });
        });
    };