Search code examples
spring-bootapache-kafkakafka-consumer-apispring-kafka

Spring Kafka @KafkaListener - Retry sending failed messages and manually commit the offset


I am using the @KafkaListener for consuming from the kafka topic, I have an application logic to process each record by multiple consumers in different consumer group.

Now my problem statement is that -I have to send the consumed message to third party rest-end point. If message sending fails to the rest-end point ,I should not commit the offset and need to retry sending the message based on the configurable number of times. After configurable no of retries, if it fails to send the message. I need to log the cause of the exception and commit the offset.

I am using spring kafka 2.2.0. Below is my configuration for manual acknowledgement

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false

spring.kafka.listener.ack-mode=manual-immediate

logging.level.org.springframework.kafka=debug

Till now i am able to consume the message published into a topic by different consumer. and also able to manually acknowledge the offset from different consumer. for example

@KafkaListener(topics = "testTopic",groupId="group-1")
    public void consumerOne(ConsumerRecord<String, String> record,Acknowledgment ack) {
        logger.info("OF PayLoad:{}", record);
        logger.info("OF value:"+record.value());
        logger.info("OF Offset:{}",record.offset());
        ack.acknowledge();

    }
    @KafkaListener(topics = "testTopic",groupId="group-2")
    public void consumerTwo(ConsumerRecord<String, String> record,Acknowledgment ack) {
        logger.info("MF PayLoad_2:{}", record);
        logger.info("MF value_2:"+record.value());
        logger.info("MF Offset_2:{}",record.offset());
        //ack.acknowledge();
    }

here,consumerTwo is not acknowledging the message. I want to retry to send all the message again to the consumer based on the configurable amount of time.

Any example or suggestion suggestion will be great help.


Solution

  • See this answer.

    In 2.2, we added the ability to add a recoverer to the SeekToCurrentErrorHandler which can take some action.

    The pull request in my final comment shows how to commit the offsets if the recoverer successfully recovers the message. This will be in Thursday's 2.2.4 relesase.

    In your case, the recoverer can call the REST service and throw an exception if it fails.