Search code examples
javaapache-kafkaspring-kafka

Kafka Consumer not retrying a message that has been failed to process


I am writing a kafka consumer.I have set the Acknowledged property to manual. So whenever consumer failed to process the message I don't do ack. Now I want the consumer to process this failed message immediately, but it doesn't.

This is my consumer config class looks like.

    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAPSERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "${crmdsforecast.judjement-consumer-groupId}");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest" );
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    @ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
   
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setAckOnError(false);
        SeekToCurrentErrorHandler errorHandler = new SeekToCurrentErrorHandler((record, exception) -> {
            System.out.println("Error while processing the record {}"+  exception.getCause().getMessage());
        }, new FixedBackOff(3000L, 2L));
        factory.setErrorHandler(errorHandler);
        
        return factory;
    }

My Consumer method looks like this.

    @KafkaListener( containerFactory = "kafkaListenerContainerFactory",id = "${id}", topics = "${topicname}")
    public void consume(String message,Acknowledgment acknowledgment){
        Dto payload = new Dto();
        try{
            payload = payloadDeserializer.convertIntoDtoObject(message);
            
            if(payload != null)
                //Do Something;
            
            acknowledgment.acknowledge();
        }
        catch(JsonProcessingException e){
            log.error("Error occured while Deserializing the String input {}",message);
            acknowledgment.acknowledge();
        }
        catch(Exception e){
            log.error("Some error occured while updating revenueLines {}",e.getMessage());
            //Here I expect that if error comes consumer should reread the message.
        }
    }

I tried multiple solution including SeekToCurrentBatchErrorHandler. I changed the containerFactory method to below.

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
   
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        SeekToCurrentBatchErrorHandler errorHandler = new SeekToCurrentBatchErrorHandler();
        errorHandler.setBackOff( new ExponentialBackOff(500L, 2L));
        factory.setBatchErrorHandler(errorHandler);
        factory.setBatchListener(true);
        factory.setConsumerFactory(consumerFactory());
        
        return factory;
    }

I also tried using DefaultErrorHandler but creating separate bean and setting up commonErrorHandler but it didn't work.

    @Bean
    public DefaultErrorHandler errorHandler() {
        BackOff fixedBackOff = new FixedBackOff(1000, 3);
        DefaultErrorHandler errorHandler = new DefaultErrorHandler((consumerRecord, e) -> {
            System.out.println(String.format("consumed record %s because this exception was thrown",consumerRecord.toString(),e.getClass().getName()));
        }, fixedBackOff);
        errorHandler.addNotRetryableExceptions(NullPointerException.class);
        return errorHandler;
    }

Can anyone helps with this ?


Solution

  • Kafka maintains 2 pointers, committed offset and current position; they are unrelated, except that the position is set to the committed offset when a consumer first starts.

    Not acknowledging a record will not cause it to be redelivered because the position doesn't change.

    You are catching the exceptions.

    You need to throw the exception to the container; the default error handler will perform a seek to reposition the partition and redeliver the record.

    See the documentation for Spring for Apache Kafka.

    https://docs.spring.io/spring-kafka/docs/current/reference/html/#annotation-error-handling