I am facing an issue. I am testing with 3 consumers and 1 producer. From al keystrokes a producer is producing, consumers are unable to receive all data being sent by the producer. What could be the reason for this?
In the following screenshot, producer sent a
, b
, c
and d
but only d
was received.
The bottom-right is the producer and other 3 are the consumers listening to the same stream. As we see, only one consumer in the bottom left has received d
and other data has been lost.
Code that I am testing with:
Producer:
var stdin = process.openStdin();
function insert( input ) {
var params = {
Data: input,
PartitionKey: 'users',
StreamName: 'test-stream1'
};
kinesis.putRecord( params, function ( err, data ) {
if ( err ) console.log( err, err.stack ); // an error occurred
else console.log( data ); // successful response
} );
}
stdin.addListener( "data", function ( d ) {
// PRODUCING THE KEY STROKES
// TYPED BY USER INPUT
insert( d.toString().trim() );
} );
Consumer:
function getRecord() {
kinesis.describeStream( {
StreamName: 'test-stream1'
}, function ( err, streamData ) {
if ( err ) {
console.log( err, err.stack ); // an error occurred
} else {
// console.log( streamData ); // successful response
streamData.StreamDescription.Shards.forEach( shard => {
kinesis.getShardIterator( {
ShardId: shard.ShardId,
ShardIteratorType: 'LATEST',
StreamName: 'test-stream1'
}, function ( err, shardIteratordata ) {
if ( err ) {
// console.log( err, err.stack ); // an error occurred
} else {
//console.log(shardIteratordata); // successful response
kinesis.getRecords( {
ShardIterator: shardIteratordata.ShardIterator
}, function ( err, recordsData ) {
if ( err ) {
// console.log( err, err.stack ); // an error occurred
} else {
// console.log( JSON.stringify( recordsData ) ); // successful response
recordsData.Records.forEach( record => {
console.log( record.Data.toString(), shard.ShardId );
} );
}
} );
}
} );
} );
}
} );
}
setInterval( getRecord, 1000 * 1 );
I have used iterator type as LATEST
so that each consumer gets the latest data from the producer.
If I am not mistaken you are always reading after the most recent records. This is configured via the ShardIteratorType: 'Latest'
.
According to the documentation it says
LATEST - Start reading just after the most recent record in the shard, so that you always read the most recent data in the shard.
This should only be used to get the very first iterator and afterwards you need to get the next iterator starting at the exact same position where you ended with the last one.
Therefore you can use the NextShardIterator
of the GetIterator
request if present to followup on the comping records. See doc.
Currently you are discarding the iterator after each interval and starting at the very end again.
I took your code and moved the setInterval
to only repeat the getRecords
request with the next iterator
function getRecord() {
kinesis.describeStream({ StreamName: 'test-stream1'}, function ( err, streamData ) {
if ( err ) {
console.log( err, err.stack ); // an error occurred
} else {
// console.log( streamData ); // successful response
streamData.StreamDescription.Shards.forEach( shard => {
kinesis.getShardIterator({
ShardId: shard.ShardId,
ShardIteratorType: 'LATEST',
StreamName: 'test-stream1'
}, function ( err, shardIteratordata ) {
if ( err ) {
console.log( err, err.stack ); // an error occurred
} else {
var shardIterator = shardIteratordata.ShardIterator;
setInterval(function() {
kinesis.getRecords({ ShardIterator: shardIterator }, function ( err, recordsData ) {
if ( err ) {
console.log( err, err.stack ); // an error occurred
} else {
// console.log( JSON.stringify( recordsData ) ); // successful response
recordsData.Records.forEach(record => {
console.log( record.Data.toString(), shard.ShardId );
});
shardIterator = iterator = recordsData.NextShardIterator;
}
});
}, 1000 * 1 );
}
});
});
}
});
}