Search code examples
apache-kafkakafka-consumer-apispring-kafka

Spring Kafka Consumer with At Least One Semantics


This is my requirement

  1. I need to read data from 2 topics .
  2. I need to read 1 message at a time and call DB .
  3. Once DB call is completed (irrespective of succes or failure ) i need to commit the kafka offset .

I have done the below configuration .So is this the correct way to implemt AT LEAST ONCE semantic while reading and processing 1 message at a time.

I am fine with duplicate messages being read by Kafka Consumer.(although will like to reduce duplicates).

 @EnableKafka
    public class KafkaConsumerConfig {
    
        @Bean
        public ConsumerFactory<Integer, String> consumerFactory() {
            Map<String, Object> props = new HashMap<>();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "grpid-mytopic");
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
    
        
    
            return new DefaultKafkaConsumerFactory<>(props);
        }
    
        @Bean
        KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            factory.setConcurrency(2);
    
            return factory;
        }



 


@KafkaListener(topics = "mytopic-1")
        public void consumeFromTopic1(String message, ConsumerRecordMetadata meta)
        {
            
             dbservice.callDB(message);
    
        }
        
        @KafkaListener(topics = "mytopic-2")
        public void consumeFromTopic2(String message, ConsumerRecordMetadata meta)
        {
            
             dbservice.saveInDB(message);
    
        }

Solution

  • No, ENABLE_AUTO_COMMIT_CONFIG should be false, the listener container will reliably commit the offset, in a deterministic fashion, when the listener exits.