Search code examples
spring-bootapache-kafkaspring-kafka

springboot consumer wont consume any messages


I have a springboot consumer app. When I ran it the first time, it was consuming messages from the Kafka topic. But when I ran it again, it stopped consuming. In the logs I see the following message.

2022-08-02 00:14:08.130  INFO 96561 --- [ntainer#0-0-C-1] o.a.k.c.consumer.internals.Fetcher       : [Consumer clientId=consumer-my-transformation-local-1, groupId=my-transformation-local] Fetch position FetchPosition{offset=17400, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 1 rack: null)], epoch=0}} is out of range for partition CraveVideoTestLog-0, resetting offset
2022-08-02 00:14:08.135  INFO 96561 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=consumer-my-transformation-local-1, groupId=my-transformation-local] Resetting offset for partition CraveVideoTestLog-0 to position FetchPosition{offset=56464, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 1 rack: null)], epoch=0}}.

I understand that the consumer could not get the offset. In situations like this, the consumer will refer to auto-offset-reset property. As you can see, I have set it to earliest, hoping that the consumer would read from the beginning. But it does not.

application.yml

spring:
  application:
    name: my-transformation
  kafka:
    bootstrap-servers: ${BOOTSTRAP_SERVERS:localhost:9092}
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
      retries: 2
      compression-type: snappy
    consumer:
      group-id: ${GROUP_ID:my-transformation}
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

In my Java class

    @SendTo("output-topic-here")
    @KafkaListener(topics = "${my-transformation.input.topic}", errorHandler = "myErrorHandler")
    public Message<?> process(Message<String> in, ConsumerRecord<String, String> record) throws Exception {
        log.info("Going to process event for key: {} offset: {} partition: {}", record.key(), record.offset(), record.partition());
    }

I tried out a few of things.

  • I set the auto-offset-reset value to none. As expected, it threw an exception complaining about the offset.
  • I created a new consumer group. I could see that the new consumer group was created from the confluent UI. But, again, the messages were not consumed.
  • I read through a lot of posts. Here are a few that are very close to the issue I am facing. Link 1 Link 2

I feel I am missing something really silly, but could not figure out what it is.


Solution

  • Finally, I found out the reason for my consumer's behaviour. The topic's retention policy was set to delete and retention time was set to default(1 week).

    The reason why the consumer stopped consuming records was because the topic's retention time had elapsed. So the entire data in the topic was deleted and hence the consumer had nothing to read. When I pushed new records into the topic again, the consumer started consuming.