I have a springboot consumer app. When I ran it the first time, it was consuming messages from the Kafka topic. But when I ran it again, it stopped consuming. In the logs I see the following message.
2022-08-02 00:14:08.130 INFO 96561 --- [ntainer#0-0-C-1] o.a.k.c.consumer.internals.Fetcher : [Consumer clientId=consumer-my-transformation-local-1, groupId=my-transformation-local] Fetch position FetchPosition{offset=17400, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 1 rack: null)], epoch=0}} is out of range for partition CraveVideoTestLog-0, resetting offset
2022-08-02 00:14:08.135 INFO 96561 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=consumer-my-transformation-local-1, groupId=my-transformation-local] Resetting offset for partition CraveVideoTestLog-0 to position FetchPosition{offset=56464, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 1 rack: null)], epoch=0}}.
I understand that the consumer could not get the offset. In situations like this, the consumer will refer to auto-offset-reset property. As you can see, I have set it to earliest
, hoping that the consumer would read from the beginning. But it does not.
application.yml
spring:
application:
name: my-transformation
kafka:
bootstrap-servers: ${BOOTSTRAP_SERVERS:localhost:9092}
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
retries: 2
compression-type: snappy
consumer:
group-id: ${GROUP_ID:my-transformation}
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
In my Java class
@SendTo("output-topic-here")
@KafkaListener(topics = "${my-transformation.input.topic}", errorHandler = "myErrorHandler")
public Message<?> process(Message<String> in, ConsumerRecord<String, String> record) throws Exception {
log.info("Going to process event for key: {} offset: {} partition: {}", record.key(), record.offset(), record.partition());
}
I tried out a few of things.
auto-offset-reset
value to none
. As expected, it threw an exception complaining about the offset.I feel I am missing something really silly, but could not figure out what it is.
Finally, I found out the reason for my consumer's behaviour. The topic's retention policy was set to delete and retention time was set to default(1 week).
The reason why the consumer stopped consuming records was because the topic's retention time had elapsed. So the entire data in the topic was deleted and hence the consumer had nothing to read. When I pushed new records into the topic again, the consumer started consuming.