Search code examples
spring-bootapache-kafkaspring-kafka

poll() and commitAsync() in Spring-Kafka


I am trying to write a kafka consumer application in java on Springboot platform. Earlier, I have written code in plain java but now converting into spring-kafka as it can give some advantage over plain java. I do have few questions that I am trying to understand.

  • It seems that I don't have to explicitly poll() loop in spring-kafka and it would be handled automatically by @KafkaListener?

  • I have set enable.auto.commit='false', As I have to do some processing before committing offsets, how can I perform commitAsync() in Spring-Kafka?

    ConsumerConfig.java :

    @EnableKafka
    @Configuration
    public class KafkaConsumerConfig {
    
        @Value("${app.kafka_brokers}")
        private String KAFKA_BROKERS;
    
        @Value("${app.topic}")
        private String KAFKA_TOPIC;
    
        @Value("${app.group_id_config}")
        private String GROUP_ID_CONFIG;
    
        @Value("${app.schema_registry_url}")
        private String SCHEMA_REGISTRY_URL;
    
        @Value("${app.offset_reset}")
        private String OFFSET_RESET;
    
        @Value("${app.max_poll_records}")
        private String MAX_POLL_RECORDS;
    
        @Value("${app.security.protocol}")
        private String SSL_PROTOCOL;
    
        @Value("${app.ssl.truststore.password}")
        private String SSL_TRUSTSTORE_SECURE;
    
        @Value("${app.ssl.keystore.password}")
        private String SSL_KEYSTORE_SECURE;
    
        @Value("${app.ssl.key.password}")
        private String SSL_KEY_SECURE;
    
        @Value("${app.ssl.truststore.location}")
        private String SSL_TRUSTSTORE_LOCATION_FILE_NAME;
    
        @Value("${app.ssl.keystore.location}")
        private String SSL_KEYSTORE_LOCATION_FILE_NAME;
    
        @Bean
        public ConsumerFactory<String, String> consumerFactory(){
    
            Map<String, Object> props = new HashMap<>();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,KAFKA_BROKERS);
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OFFSET_RESET);
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
            props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID_CONFIG);
            props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL);
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    
            props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SSL_PROTOCOL);
            props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,SSL_TRUSTSTORE_LOCATION_FILE_NAME);
            props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, SSL_TRUSTSTORE_SECURE);
            props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,SSL_KEYSTORE_LOCATION_FILE_NAME);
            props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, SSL_KEYSTORE_SECURE);
            props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, SSL_KEY_SECURE);
    
            return new DefaultKafkaConsumerFactory<>(props);
    
        }
    
        @Bean
        ConcurrentKafkaListenerContainerFactory<String, String> 
        kafkaListenerContainerFactory() {
    
          ConcurrentKafkaListenerContainerFactory<String, String> factory =
                                new ConcurrentKafkaListenerContainerFactory<>();
          factory.setConsumerFactory(consumerFactory());
          factory.setConcurrency(3);
          return factory;
      }
    
    }
    

KafkaConsumer.java :

@Component
public class KafkaConsumer {
    
    @KafkaListener(topics = "topic", groupId = "group")
    public void run(ConsumerRecord<String, GenericRecord> record) {
        
        System.out.println(record);
        
    <-- how to asyncCommit()--> 
    }

}

Solution

  • First of all, I suggest you use the properties and AutoConfiguration set by Spring kafka instead of creating your own as it follows the DRY Principle: Don't Repeat Yourself.

    spring:
      kafka:
        bootstrap-servers: ${app.kafka_brokers}
        consumer:
          auto-offset-reset: ${app.offset_reset}
          enable-auto-commit: false   // <---- disable auto committing
        ssl:
          protocol: ${app.security.protocol}
          key-store-location: ${app.ssl.keystore.location}
          key-store-password:  ${app.ssl.keystore.password}
          trust-store-location: ${app.ssl.truststore.location}
          trust-store-password: ${app.ssl.truststore.password}
      // And other properties
        listener:
          ack-mode: manual // This is what you need
    

    The AckMode docs: https://docs.spring.io/spring-kafka/api/org/springframework/kafka/listener/ContainerProperties.AckMode.html

    Essentially, manual is an asynchronous acknowledgment, while manual_immediate is synchronous.

    Then inside your @KafkaListener component you can inject org.springframework.kafka.support.Acknowledgment object acknowledge your message.

    @Component
    public class KafkaConsumer {
        
        @KafkaListener(topics = "topic", groupId = "group")
        public void run(ConsumerRecord<String, GenericRecord> record, Acknowledgment acknowledgment) {
            
            System.out.println(record);
            
            acknowledgment.acknowledge();
        }
    
    }
    

    Here's the documentation for what can be injected into a @KafkaListener method: https://docs.spring.io/spring-kafka/reference/html/#message-listeners