I'm working on a Python lambda that consumes an AWS Kinesis Data Stream. But I'm struggling to understand the shape of kinesis record events. For example:
{
"Records": [
{
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": "1",
"sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
"data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
"approximateArrivalTimestamp": 1545084650.987
},
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
"awsRegion": "us-east-2",
"eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
},
{
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": "1",
"sequenceNumber": "49590338271490256608559692540925702759324208523137515618",
"data": "VGhpcyBpcyBvbmx5IGEgdGVzdC4=",
"approximateArrivalTimestamp": 1545084711.166
},
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618",
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
"awsRegion": "us-east-2",
"eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
}
]
}
Source: Using AWS Lambda with Amazon Kinesis
Where is the data I originally put on the kinesis stream represented in this object? And how do I access this data?
The data you put on the stream is represented as a Base64 encoded string on each record's kinesis.data
key. For example (truncated):
{
"Records": [
{
"kinesis": {
...
"data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
...
},
...
},
{
"kinesis": {
...
"data": "VGhpcyBpcyBvbmx5IGEgdGVzdC4=",
...
},
...
}
]
}
To access the data, loop through each Records
object and Base64 decode the kinesis.data
value.
import base64
for record in event["Records"]:
decoded_data = base64.b64decode(record["kinesis"]["data"]).decode("utf-8")
print(decoded_data)
# Record 1: Hello, this is a test.
# Record 2: This is only a test.
Note: This example assumes that the data sent to the kinesis stream was originally utf-8
encoded before kinesis b64 encoded it.