Search code examples
amazon-web-services.net-coreamazon-kinesis

AWS Kinesis NextShardIterator Never Getting Null


Context: I'm trying to get records from the Kinesis stream with API references. I'm using .Net Core (3.1 version).

I'm writing data with an API to Kinesis Stream. This issue has not any problem. But I have some problems with reading data. I put the getRecord method in a do-while loop. Condition of while is nextShardIterator value is null or not? But this value never getting null. I can not break the loop.

Some answers include the phrase: "NextShardIterator The next position in the shard from which to start sequentially reading data records. If set to null, the shard has been closed and the requested iterator does not return any more data."

I have only 1 stream and 1 shard. I'm putting 2 records. After that, I execute the reading method. It is getting these records. But after that nextShardIterator never getting null even if records consumed.

  • List streams (I have only 1 stream)
  • Describe stream and get shard (I have only 1 shard)
  • Get ShardIterator with TRIM_HORIZON type
  • GetRecord in a loop (while{if nextShardIterator != null})
  • After finding a record, I'm trying use the AFTER_SEQUENCE shard iterator
  • If it can not get any record, I'm trying to use the LATEST shard iterator

putRecord code is like that:

public async Task<ResponseModel> PutRecord(string orderFlow, string documentId)
        {
            byte[] bytedata = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(orderFlow));
            using MemoryStream memoryStream = new MemoryStream(bytedata);
            var putRecordRequest = new PutRecordRequest();
            putRecordRequest.StreamName = myStreamName;
            putRecordRequest.PartitionKey = "partition" + documentId;
            putRecordRequest.Data = memoryStream;
            try
            {
                var putRecordResponse = await kinesisClient.PutRecordAsync(putRecordRequest);
                return new ResponseModel
                {
                    Data = putRecordResponse,
                    Status = ResponseStatus.Success,
                    Message = "Successfully put record!"
                };
            }
            catch (Exception e)
            {
                return new ResponseModel
                {
                    Data = null,
                    Status = ResponseStatus.Error,
                    Message = "PutRecord Error: " + e.Message
                };
            }
        }

But I have some problems with reading data. GetRecord code is like that:

public async Task<ResponseModel> GetRecords()
        {
            var listStreams = await ListStreams();
            if (listStreams.StreamNames.Count == 0)
            {
                return new ResponseModel
                {
                    Data = null,
                    Status = ResponseStatus.Warning,
                    Message = "Do not have any Stream!"
                };
            }
            myStreamName = listStreams.StreamNames[0];

            var describeStreams = await DescribeStream(listStreams.StreamNames[0]);
            if (describeStreams.StreamDescription.StreamStatus != "ACTIVE")
            {
                return new ResponseModel
                {
                    Data = null,
                    Status = ResponseStatus.Warning,
                    Message = "Stream status: " + describeStreams.StreamDescription.StreamStatus
                };
            }

            var shards = describeStreams.StreamDescription.Shards;
            if (shards.Count == 0)
            {
                return new ResponseModel
                {
                    Data = null,
                    Status = ResponseStatus.Warning,
                    Message = "Do not have any Shard (or data)!"
                };
            }
            var shardId = shards[0].ShardId;

            var shardIterator = await GetShardIterator(shardId);
            if (string.IsNullOrWhiteSpace(shardIterator.ShardIterator))
            {
                return new ResponseModel
                {
                    Data = null,
                    Status = ResponseStatus.Warning,
                    Message = "ShardIterator is null or empty!"
                };
            }

            var getRecords = await GetRecords(shardIterator.ShardIterator);
            Console.WriteLine("First Iterator: " + shardIterator);
            var dataList = new List<Record>();

            do
            {
                if (getRecords.Records.Count == 0)
                {
                    Console.WriteLine("Records are empty!");
                    var nextShardIterator = GetShardLatest(shardId).Result.ShardIterator;
                    getRecords = await GetRecords(nextShardIterator);
                    Console.WriteLine("Latest Iterator: " + nextShardIterator);
                }
                else
                {
                    Console.WriteLine("We have records!");
                    foreach (var record in getRecords.Records)
                    {
                        dataList.Add(record);
                    }
                    var nextShardIterator = GetShardIteratorWithSequence(shardId, getRecords.Records[getRecords.Records.Count-1].SequenceNumber).Result.ShardIterator;
                    Console.WriteLine("AfterSequence Iterator: " + nextShardIterator);
                    getRecords = await GetRecords(nextShardIterator);
                }
            } while (getRecords.NextShardIterator != null);

            return new ResponseModel
            {
                Data = dataList,
                Status = ResponseStatus.Success,
                Message = "Successfull"
            };
        }

Solution

  • A Kinesis stream is a potentially infinite sequence of records, that can be added to by multiple producers at any time. As a result, the shard iterator of an open stream will never be null.

    If you want to break out of the loop when you get to the "end" of the stream, look at the MillisBehindLatest field in the GetRecords response. To quote the documentation:

    A value of zero indicates that record processing is caught up, and there are no new records to process at this moment.

    Beware, however, that new records might be added at any time. If you do break out of the loop, be sure to save the returned SequenceNumber from the last record that you processed, so that you can pick up where you left off.