Search code examples
amazon-web-servicesaws-iotaws-iot-analytics

Parse JSON array input in IoT Analytics


I am receiving multiple data records at a time as a JSON array from an IoT device in my channel. The received message looks like this :

[
    {
      "Field1": "Value1",
      "Field2": "Value2",
      "Field3": "Value3"
    },
    {
      "Field1": "AnotherValue1",
      "Field2": "AnotherValue2",
      "Field3": "AnotherValue3"
    }
]

I create a dataset using the following SQL query :

SELECT * FROM mydatastore

When I run the data set, the result returned is :

array                                              __dt 
-----                                              -----
[{field1=Value1, field2=Value2, field3=Value3}]    2019-02-21 00:00:00.000

My desired result is :

Field1           Field2           Field3
------           ------           ------
Value1           Value2           Value3
AnotherValue1    AnotherValue2    AnotherValue3

How can I get IoT Analytics to create a new row in the datastore for each element within the received JSON array?


Solution

  • How can I get IoT Analytics to create a new row in the datastore for each element within the received JSON array?

    The simplest way should be to leverage a Lambda Activity on your Pipeline, and have it parse the single JSON payload into the desired structure. This depends somewhat on the 'raw' structure of the messages sent to the Channel.

    So, for instance, we can send data to the Channel via CLI batch-put-message, like so:

    aws iotanalytics batch-put-message --channel-name sample_channel --messages '[{"messageId": "message1", "payload": "{\"array\": [{\"Field1\": \"Value1\", \"Field2\": \"Value2\", \"Field3\": \"Value3\"},{\"Field1\": \"AnotherValue1\", \"Field2\": \"AnotherValue2\", \"Field3\": \"AnotherValue3\"}]}"}]'
    

    The Channel would then have a single message structured like this:

    {
      "messageId": "message1",
      "payload": {
        "array": [
          {
            "Field1": "Value1",
            "Field2": "Value2",
            "Field3": "Value3"
          },
          {
            "Field1": "AnotherValue1",
            "Field2": "AnotherValue2",
            "Field3": "AnotherValue3"
          }
        ]
      }
    }
    
    

    If your Pipeline has a Lambda Activity, then the message(s) from the Channel will be passed to your Lambda function in the event argument.

    I created a simple Lambda function (using Python 3.7) using the AWS Lambda console inline editor, and named it sample_lambda:

    import json
    import sys
    import logging
    
    # Configure logging
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
    streamHandler = logging.StreamHandler(stream=sys.stdout)
    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    streamHandler.setFormatter(formatter)
    logger.addHandler(streamHandler)
    
    
    def lambda_handler(event, context):
        # This can be handy to see the raw structure of the incoming event
        # will log to the matching CloudWatch log:
        # /aws/lambda/<name_of_the_lambda>
        # logger.info("raw event: {}".format(event))
    
        parsed_rows = []
    
        # Depending on the batchSize setting of the Lambda Pipeline Activity,
        # you may receive multiple messages in a single event
        for message_payload in event:
            if 'array' in message_payload:
                for row in message_payload['array']:
                    parsed = {}
                    for key, value in row.items():
                        parsed[key] = value
                    parsed_rows.append(parsed)
    
        return parsed_rows
    

    I added the proper permissions so that the IoT-Analytics could invoke the lambda function via CLI:

    aws lambda add-permission --function-name sample_lambda --statement-id statm01 --principal iotanalytics.amazonaws.com --action lambda:InvokeFunction
    

    Reprocessing the Pipeline, the parsed rows are placed in the DataStore; executing the DataSet, I get this net result:

    "array","field1","field2","field3","__dt"
    ,"Value1","Value2","Value3","2019-04-26 00:00:00.000"
    ,"AnotherValue1","AnotherValue2","AnotherValue3","2019-04-26 00:00:00.000"