Search code examples
springspring-bootapache-kafkakafka-consumer-apispring-kafka

Spring Kafka consumer - manual commit with recovery callback mechanism


I am building a kafka consumer. I have set the recovery callback similar to below. I have enabled manual commit. How can I acknowledge the message in recovery callback method so that there is no lag.

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConcurrency(conncurrency);
        factory.setConsumerFactory(consumerFactory());
        factory.setRetryTemplate(retryTemplate());
                factory.setRecoveryCallback(new RecoveryCallback<Object>() {
        @Override
        public Object recover(RetryContext context) throws Exception {
            // TODO Auto-generated method stub
            logger.debug(" In recovery callback method !!");
            return null;
        }
    });
        factory.getContainerProperties().setAckMode(AckMode.MANUAL);
        return factory;
    }

    /*
     * Retry template.
     */

    protected RetryPolicy retryPolicy() {
        SimpleRetryPolicy policy = new SimpleRetryPolicy(maxRetryAttempts, retryableExceptions);
        return policy;
    }

    protected BackOffPolicy backOffPolicy() {
        ExponentialBackOffPolicy policy = new ExponentialBackOffPolicy();
        policy.setInitialInterval(initialRetryInterval);
        policy.setMultiplier(retryMultiplier);
        return policy;
    }

    protected RetryTemplate retryTemplate() {
       RetryTemplate template = new RetryTemplate();
       template.setRetryPolicy(retryPolicy());
       template.setBackOffPolicy(backOffPolicy());
       return template;
    }
}

Solution

  • Your question is too broad. You need to be more specific.

    There is no any assumption in the Framework what you could do in case of retry exhausting during consumption errors.

    I think you should start from the Spring Retry project to understand what is that RecoveryCallback at all and how it works:

    If the business logic does not succeed before the template decides to abort, then the client is given the chance to do some alternate processing through the recovery callback.

    A RetryContext has:

    /**
     * Accessor for the exception object that caused the current retry.
     * 
     * @return the last exception that caused a retry, or possibly null. It will be null
     * if this is the first attempt, but also if the enclosing policy decides not to
     * provide it (e.g. because of concerns about memory usage).
     */
    Throwable getLastThrowable();
    

    Also Spring Kafka populates additional attributes to that RetryContext to deal with in the RecoveryCallback: https://docs.spring.io/spring-kafka/docs/2.0.0.RELEASE/reference/html/_reference.html#_retrying_deliveries

    The contents of the RetryContext passed into the RecoveryCallback will depend on the type of listener. The context will always have an attribute record which is the record for which the failure occurred. If your listener is acknowledging and/or consumer aware, additional attributes acknowledgment and/or consumer will be available. For convenience, the RetryingAcknowledgingMessageListenerAdapter provides static constants for these keys. See its javadocs for more information.