I am using spring kinesis binder for consuming messages in batch mode
Some times I am not able to consume the messages from streams. This is not happening all the time
No records for [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=AFTER_SEQUENCE_NUMBER, sequenceNumber='49594358705006691330463332232285735104253344290967650306', timestamp=null, stream='mystream-1', shard='shardId-000000000000', reset=false}, state=CONSUME}] on sequenceNumber [null]. Suspend consuming for [1000] milliseconds.
I keep getting this message and messages are not consumed.
My Conf looks like below
spring:
cloud:
stream:
bindings:
input:
group: mygroup
destination: mystream-1
content-type: application/json
output:
destination: mystream-2
content-type: application/json
kinesis:
bindings:
input:
consumer:
listenerMode: batch
idleBetweenPolls: 60000
consumer-backoff: 1000
binder:
headers: x-item_id,x-message_type
locks:
table: lTLocks
leaseDuration: 30
refreshPeriod: 3000
checkpoint:
table: lTCheckPoints
There are messages in stream but I am not consume intermittently. Can you please help.
I think your checkpoint store and shard in the Kinesis stream contain different sequence numbers.
Please, consider to use clean up lTCheckPoints
table before starting to consume from the stream.
I don't say that you need to do that all the time, but at least for this sake of clean testing environment.