Search code examples
node.jsamazon-web-servicesproducer-consumeramazon-kinesisdata-stream

Data is getting lost from the Kinesis Stream. What could be the reason for this?


I am facing an issue. I am testing with 3 consumers and 1 producer. From al keystrokes a producer is producing, consumers are unable to receive all data being sent by the producer. What could be the reason for this?

In the following screenshot, producer sent a , b , c and d but only d was received.enter image description here

The bottom-right is the producer and other 3 are the consumers listening to the same stream. As we see, only one consumer in the bottom left has received d and other data has been lost.

Code that I am testing with:

Producer:

var stdin = process.openStdin();

function insert( input ) {

    var params = {
    Data: input,
    PartitionKey: 'users',
    StreamName: 'test-stream1'
    };
    kinesis.putRecord( params, function ( err, data ) {
    if ( err ) console.log( err, err.stack ); // an error occurred
    else console.log( data );           // successful response
    } );
}



stdin.addListener( "data", function ( d ) {
    // PRODUCING THE KEY STROKES
    // TYPED BY USER INPUT
    insert( d.toString().trim() );
} );

Consumer:

    function getRecord() {
        kinesis.describeStream( {
        StreamName: 'test-stream1'
        }, function ( err, streamData ) {
        if ( err ) {
            console.log( err, err.stack ); // an error occurred
        } else {
            // console.log( streamData ); // successful response
            streamData.StreamDescription.Shards.forEach( shard => {
            kinesis.getShardIterator( {
                ShardId: shard.ShardId,
                ShardIteratorType: 'LATEST',
                StreamName: 'test-stream1'
            }, function ( err, shardIteratordata ) {
                if ( err ) {
                    // console.log( err, err.stack ); // an error occurred
                } else {
                    //console.log(shardIteratordata); // successful response
                    kinesis.getRecords( {
                        ShardIterator: shardIteratordata.ShardIterator
                    }, function ( err, recordsData ) {
                        if ( err ) {
                            // console.log( err, err.stack ); // an error occurred
                        } else {
                            // console.log( JSON.stringify( recordsData ) ); // successful response
                            recordsData.Records.forEach( record => {
                                console.log( record.Data.toString(), shard.ShardId );
                            } );
                        }
                    } );
                }
            } );
            } );
        }
        } );
    }

    setInterval( getRecord, 1000 * 1 );

I have used iterator type as LATEST so that each consumer gets the latest data from the producer.


Solution

  • If I am not mistaken you are always reading after the most recent records. This is configured via the ShardIteratorType: 'Latest'. According to the documentation it says

    LATEST - Start reading just after the most recent record in the shard, so that you always read the most recent data in the shard.

    This should only be used to get the very first iterator and afterwards you need to get the next iterator starting at the exact same position where you ended with the last one.

    Therefore you can use the NextShardIterator of the GetIterator request if present to followup on the comping records. See doc.

    Currently you are discarding the iterator after each interval and starting at the very end again.

    Example

    I took your code and moved the setInterval to only repeat the getRecords request with the next iterator

    function getRecord() {
      kinesis.describeStream({ StreamName: 'test-stream1'}, function ( err, streamData ) {
        if ( err ) {
          console.log( err, err.stack ); // an error occurred
        } else {
          // console.log( streamData ); // successful response
          streamData.StreamDescription.Shards.forEach( shard => {
            kinesis.getShardIterator({
              ShardId: shard.ShardId,
              ShardIteratorType: 'LATEST',
              StreamName: 'test-stream1'
            }, function ( err, shardIteratordata ) {
              if ( err ) {
                console.log( err, err.stack ); // an error occurred
              } else {
                var shardIterator = shardIteratordata.ShardIterator;
    
                setInterval(function() {
                  kinesis.getRecords({ ShardIterator: shardIterator }, function ( err, recordsData ) {
                    if ( err ) {
                      console.log( err, err.stack ); // an error occurred
                    } else {
                      // console.log( JSON.stringify( recordsData ) ); // successful response
                      recordsData.Records.forEach(record => {
                        console.log( record.Data.toString(), shard.ShardId );
                      });
                      shardIterator = iterator = recordsData.NextShardIterator;
                    }
                  });
                }, 1000 * 1 );
    
              }
            });
          });
        }
      });
    }