Search code examples
spring-bootspring-kafkaspring-retry

How to set acknowledgement in case when retry gets exhausted in Kafka consumer


I have a Kafka consumer which retry 5 time and I am using Spring Kafka with retry template . Now if all retry are failed then how to does acknowledge work in that case . Also if i have set acknowledge mode to manually then how to acknowledge those message

Consumer

@Bean("kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(RetryTemplate retryTemplate) {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
    factory.setRetryTemplate(retryTemplate);
    factory.setRecoveryCallback(context -> {
        log.error("Maximum retry policy has been reached {}", context.getAttribute("record"));
        Acknowledgment ack = (Acknowledgment) context.getAttribute(RetryingMessageListenerAdapter.CONTEXT_ACKNOWLEDGMENT);
        ack.acknowledge();
        return null;
    });
    factory.setConcurrency(Integer.parseInt(kafkaConcurrency));
    return factory;
}

Kafka Listener

@KafkaListener(topics = "${kafka.topic.json}", containerFactory = "kafkaListenerContainerFactory")
public void recieveSegmentService(String KafkaPayload, Acknowledgment acknowledgment) throws Exception {
    KafkaSegmentTrigger kafkaSegmentTrigger;
    kafkaSegmentTrigger = TransformUtil.fromJson(KafkaPayload, KafkaSegmentTrigger.class);
    log.info("Trigger recieved from segment service {}", kafkaSegmentTrigger);
    processMessage(kafkaSegmentTrigger);
    acknowledgment.acknowledge();
}

Solution

  • If you have a RecoveryCallback that handles the record when retries are exhausted, and you are NOT using manual acks, the container will commit the offset.

    If you are using MANUAL acks, you can commit the offset in the recovery callback.

    The context object passed to the callback has an attribute RetryingMessageListenerAdapter.CONTEXT_ACKNOWLEDGMENT ('acknowledgment').

    It also has CONTEXT_CONSUMER and CONTEXT_RECORD.