Search code examples
spring-bootapache-kafkaspring-kafkaconsumerkafka-topic

How to create a Kafka consumer listener with a spring start that, in case of rejection of the message, retry to consume them after a variable time


I have a simple kafka consumer listener in a springboot application, like this:

@KafkaListener(topics="mytopic")
public void receive(String message) {
   LOGGER.info("received message='{}'", messge);
}

in some particular cases I would like to reject the message, but I would like the system to propose it to me again after a certain time;

how can I do?

Note: I would also like the kafka configuration to be taken custom-made (not default springboot structure)


Solution

  • my implementation does just the thing you need:

    1) kafka configuration class that takes the fields from the custom property and retry the rejected messages after 5000 milliseconds (inside the kafkaListenerContainerFactory method):

    @Configuration
    public class KafkaConfig {
    
        //...
    
        @Bean
        public Map<String, Object> consumerConfigs() {
            Map<String, Object> props = new HashMap<>();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    
            if(enableSsl) {
                //configure the following three settings for SSL Encryption
                props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
                props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, sslTruststoreLocation);
                props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,  sslPassword);
    
                // configure the following three settings for SSL Authentication
                props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, sslKeystoreLocation);
                props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, sslPassword);
                props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, sslPassword);
            }
            return props;
        }
    
        @Bean
        public ConsumerFactory<String, String> consumerFactory() {              
            return new DefaultKafkaConsumerFactory<>(consumerConfigs());
        }
    
        @Bean
        public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
    
            ContainerProperties containerProperties = factory.getContainerProperties();
            containerProperties.setAckMode(AckMode.MANUAL_IMMEDIATE);
    
            RetryTemplate retryTemplate = new RetryTemplate();
            factory.setStatefulRetry(false);
            factory.setRetryTemplate(retryTemplate);
    
            //infinite number of retry attempts
            retryTemplate.setRetryPolicy(new AlwaysRetryPolicy());
    
            //wait a "waitingTime" time before retrying
            int waitingTime = 5000;
            FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
            fixedBackOffPolicy.setBackOffPeriod(waitingTime);
            retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
    
            //or use exponential waiting
            //ExponentialBackOffPolicy expBackoff = new ExponentialBackOffPolicy();
            //expBackoff.setInitialInterval(...);
            //expBackoff.setMaxInterval(...);
            //retryTemplate.setBackOffPolicy(expBackoff);
    
            return factory;
        }
    }
    

    2) class that consumes messages:

    @Service
    public class Consumer {
    
        private final Logger logger = LoggerFactory.getLogger(Consumer.class);
    
        //...
    
        @KafkaListener(topics="${kafka.topics.test}")
        public void consume(String message, Acknowledgment ack) throws IOException {
            if(processMessage) {
                logger.info(String.format("##KAFKA## -> Consumed message -> %s", message)); 
                ack.acknowledge();
            } 
            else { 
                logger.error(String.format("##KAFKA## -> Failed message -> %s", message));  
                throw new IOException("reject message");
            }
        }
    }