Search code examples
pythonpython-3.xamazon-web-servicesaws-lambdaamazon-kinesis

How do I access the data from an AWS Kinesis Data Stream event?


I'm working on a Python lambda that consumes an AWS Kinesis Data Stream. But I'm struggling to understand the shape of kinesis record events. For example:

{
    "Records": [
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "1",
                "sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
                "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
                "approximateArrivalTimestamp": 1545084650.987
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
            "awsRegion": "us-east-2",
            "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
        },
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "1",
                "sequenceNumber": "49590338271490256608559692540925702759324208523137515618",
                "data": "VGhpcyBpcyBvbmx5IGEgdGVzdC4=",
                "approximateArrivalTimestamp": 1545084711.166
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
            "awsRegion": "us-east-2",
            "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
        }
    ]
}

Source: Using AWS Lambda with Amazon Kinesis

Where is the data I originally put on the kinesis stream represented in this object? And how do I access this data?


Solution

  • The data you put on the stream is represented as a Base64 encoded string on each record's kinesis.data key. For example (truncated):

    {
        "Records": [
            {
                "kinesis": {
                    ...
                    "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
                    ...
                },
                ...
            },
            {
                "kinesis": {
                    ...
                    "data": "VGhpcyBpcyBvbmx5IGEgdGVzdC4=",
                    ...
                },
                ...
            }
        ]
    }
    

    To access the data, loop through each Records object and Base64 decode the kinesis.data value.

    import base64
    
    
    for record in event["Records"]:
        decoded_data = base64.b64decode(record["kinesis"]["data"]).decode("utf-8")
    
        print(decoded_data)
        # Record 1: Hello, this is a test.
        # Record 2: This is only a test.
    

    Note: This example assumes that the data sent to the kinesis stream was originally utf-8 encoded before kinesis b64 encoded it.