Disclaimer: This is the first time I am using AWS Kinesis, so I might be expecting something that is not correct.
I have a very simple AWS Lambda function that inserts data into Kinesis. The API response indicates that there were no errors, and no exceptions are thrown; Kinesis simply confirms that everything went fine. However, when I go to the AWS console and try to query the data, there is nothing there!
const records = [
{ partitionKey: '1', data: 'Record 1' },
{ partitionKey: '2', data: 'Record 2' },
{ partitionKey: '3', data: 'Record 3' },
]
const params = {
Records: records.map((record) => ({
Data: record.data,
PartitionKey: record.partitionKey,
})),
StreamName: streamName,
}
const response = await kinesis.putRecords(params).promise()
const suscess = response.FailedRecordCount === 0
suscess
is true.
So, I tried to retrieve the data using code within the same Lambda function. Right after inserting the data into Kinesis, I added the following code:
const params2 = {
ShardIteratorType: 'LATEST',
ShardId: 'shardId-000000000000',
StreamName: streamName,
}
const response2 = await kinesis.getShardIterator(params2).promise()
const shardIterator = response2.ShardIterator
const records2 = await kinesis.getRecords({ ShardIterator: shardIterator! }).promise()
records2
is an empty array.
To my surprise, no records were returned; the response was empty, and no useful information was retrieved. Additionally, no exceptions were thrown.
I checked the shardId, and it does exist.
So my question is, what am I doing wrong?
Why does AWS Kinesis indicate that it inserted the data, but nothing seems to be happening? In the AWS console, I can see activities in the put and get operations from the usage graphs.
As you have recently started using Kinesis Data Streams, I recommend reading the architecture and terminology document. It will give you insights into how data is internally streamed in the system.
According to AWS documentation:
A Kinesis data stream is a set of shards. Each shard has a sequence of data records. Each data record has a sequence number that is assigned by Kinesis Data Streams.
Based on this definition, Kinesis data streams internally use shards to stream data. If you are using the on-demand capacity mode, it will initially allocate a minimum of 4 shards and increase the number of shards based on your application's throughput. However, if you create Kinesis Data Streams using the Provisioned mode, you must use a minimum of 2 shards. Therefore, Kinesis data streams always have more than one shard to handle data streaming.
Regarding the code you used on the consumer side, you explicitly hard-coded the shard ID value as shardId-000000000000
, but you cannot be certain whether the data you published was sent to that particular shard ID. So, apart from checking the FailedRecords
count on the publisher side, log the complete response of putRecords
method, and it will show you which shardId
is used to steam your data.
To simplify things, I used an AWS CLI command to publish the data, which you can easily interpret/convert to node.js to achieve the same result.
PutRecords Command:
aws kinesis put-records \
--stream-name <ENTER_STREAM_NAME_HERE> \
--records Data="Record 1",PartitionKey="1" Data="Record 2",PartitionKey="2" Data="Record 3",PartitionKey="3" --cli-binary-format raw-in-base64-out
PutRecords Response:
{
"FailedRecordCount": 0,
"Records": [
{
"SequenceNumber": "49643231014796346134140772956682328171122645057423802450",
"ShardId": "shardId-000000000005"
},
{
"SequenceNumber": "49643231014796346134140772956683537096942259686598508626",
"ShardId": "shardId-000000000005"
},
{
"SequenceNumber": "49643231014796346134140772956684746022761874315773214802",
"ShardId": "shardId-000000000005"
}
]
}
If you observe the above response, when I executed the put-records
command, the data was sent to shardId-000000000005
.
Let's assume that, I haven't consumed any records earlier from this shard, so I'm going to use TRIM_HORIZON
as ShardIteratorType
to get the data pointer value.
GetShardIterator command:
aws kinesis get-shard-iterator \
--stream-name <ENTER_STREAM_NAME_HERE> \
--shard-id shardId-000000000005 \
--shard-iterator-type TRIM_HORIZON
GetShardIterator Response:
{
"ShardIterator": "AAAAAAAAAAH2b4HgeaV/7klnxSTYd3/T9YcQ2eKxjELpkEgXAy1k0hVidh05ZeIUdMBHo0SdJOjBq5HWwGG3dZPCKM8kTBYCWYLhv7OrC9PQo6qdRuhC8uY4LH6GEBenMgf7dzS1wD/oep8EKZvSblDYVCfcpoXT4NbWIt8D5mvx4ZlPssmyuRR92DM0ywU6PjTM8tgOoixD5kEDro/SANFc5ohKIiOHxWjUsfpgvMoJFIFtLpkgQQ=="
}
The above response contains the pointer record to read the oldest (untrimmed) data record from the shard: shardId-000000000005
. Once we have the iterator value, we can use the get-records
method to fetch records.
GetRecords command:
aws kinesis get-records \
--shard-iterator AAAAAAAAAAH2b4HgeaV/7klnxSTYd3/T9YcQ2eKxjELpkEgXAy1k0hVidh05ZeIUdMBHo0SdJOjBq5HWwGG3dZPCKM8kTBYCWYLhv7OrC9PQo6qdRuhC8uY4LH6GEBenMgf7dzS1wD/oep8EKZvSblDYVCfcpoXT4NbWIt8D5mvx4ZlPssmyuRR92DM0ywU6PjTM8tgOoixD5kEDro/SANFc5ohKIiOHxWjUsfpgvMoJFIFtLpkgQQ==
GetRecords response:
{
"Records": [
{
"SequenceNumber": "49643231014796346134140772956682328171122645057423802450",
"ApproximateArrivalTimestamp": "2023-08-02T22:15:34.035000+00:00",
"Data": "UmVjb3JkIDE=",
"PartitionKey": "1"
},
{
"SequenceNumber": "49643231014796346134140772956683537096942259686598508626",
"ApproximateArrivalTimestamp": "2023-08-02T22:15:34.038000+00:00",
"Data": "UmVjb3JkIDI=",
"PartitionKey": "2"
},
{
"SequenceNumber": "49643231014796346134140772956684746022761874315773214802",
"ApproximateArrivalTimestamp": "2023-08-02T22:15:34.038000+00:00",
"Data": "UmVjb3JkIDM=",
"PartitionKey": "3"
}
],
"NextShardIterator": "AAAAAAAAAAHIu30Hail1drAR8L9vok/zazMmRawSMqVACRymRKho+06rk6PHZ0G9JbYJLzIjUoo3UVT3XiqcfTL/QO6Dt1SJhY7p2P50V8Dhv2pkGavpNnh43114Mp4i3HAUSsYkwNRW8EJSIcJ/LZysNG1z0KLmbBp+Vau5UOj9mbZu4aU7H+97WqJkoHvK8/BC2AcMnVUlR03/xVHS8zy9fer8v6bRCjDgJMCU9CHyJamX5Douqg==",
"MillisBehindLatest": 0
}
In your code, you used the ShardIteratorType
value as LATEST
, which creates a pointer after the last published record in the shard. So, if you use the LATEST
iterator type, make sure you get the iterator value first before publishing the data. You can also consider using other iterator types, as described in this document.
I presume that you have now identified the problems in your code, which could be in two places:
shardId
than the value you specified in the code.shardId-0000000000000
, you won't be able to retrieve the values that you published earlier.