Search code examples
kafka-consumer-apispring-kafka

Spring Kafka consumer polling to specific offsets at runtime


While using spring kafka in our kafka consumer. As per my business requirement, i need to poll back to same batch of records again in case of failure in processing that batch. According to https://kafka.apache.org/22/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html, section :- " Offsets and Consumer Position " says Kafka maintains a numerical offset for each record in a partition. This offset acts as a unique identifier of a record within that partition, and also denotes the position of the consumer in the partition. For example, a consumer which is at position 5 has consumed records with offsets 0 through 4 and will next receive the record with offset 5. There are actually two notions of position relevant to the user of the consumer:-

  • The position of the consumer gives the offset of the next record that will be given out. It will be one larger than the highest offset the consumer has seen in that partition. It automatically advances every time the consumer receives messages in a call to poll(Duration).

  • The committed position is the last offset that has been stored securely. Should the process fail and restart, this is the offset that the consumer will recover to. The consumer can either automatically commit offsets periodically; or it can choose to control this committed position manually by calling one of the commit APIs (e.g. commitSync and commitAsync).

For my usecase, i want to control the first one. Is there any way to do it?


Solution

  • The SeekToCurrentErrorHandler will reposition the consumer so that the failed record(s) will be redelivered if the listener throws an exception.

    Implement ConsumerSeekAware to seek to beginning during start up.