Search code examples
javaspring-kafka

Passing Acknowledgment to a spring KafkaListener consumer method


I am trying to turn off auto committing in kafka, and instead do it manually. To that end, in my application.properties I have set spring.kafka.properties.enable.auto.commit=false

I also currently have a method with the following header:

@KafkaListener(id="${"+ KafkaConfiguration.APP_REQUEST_ID +"}", topics = "${"+KafkaConfiguration.PPA_REQUEST_TOPIC +"}")
public void receive(@Payload String message,
                    @Headers MessageHeaders headers)

My understanding is that in order to manually commit I need access to the Acknowledgement object, which would be passed in as a parameter to my receive() method. My question: if I change the header to

@KafkaListener(id="${"+ KafkaConfiguration.APP_REQUEST_ID +"}", topics = "${"+KafkaConfiguration.APP_REQUEST_TOPIC +"}")
public void receive(@Payload String message,
                    @Headers MessageHeaders headers,
                    Acknowledgment acknowledgment)

Will the Acknowledgment automatically be passed in, or are there other changes I need to make?


Solution

  • yes, that way an Acknowledgment instance would be passed into your listener method. after successful processing of the received message you should call acknowledgement.acknowledge(); (only needed if you want to manually ack)

    I'd also switch to MANUAL ackmode and turn off auto-commit (what you already did), e.g. by providing a custom Spring Boot configuration class - maybe also configurable via application.properties:

    @Configuration
    class KafkaConfiguration {
    
            @Bean
            ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(KafkaProperties kafkaProperties) {
    
                final Map<String, Object> consumerProperties = kafkaProperties.buildConsumerProperties();
                consumerProperties.put(ENABLE_AUTO_COMMIT_CONFIG, false);
    
                ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
                factory.getContainerProperties().setAckMode(MANUAL);
    
                return factory;
            }
        }
    }
    

    If you do NOT want to manually acknowledge then a different ackmode might be more convenient and a better fit:

    https://docs.spring.io/spring-kafka/api/org/springframework/kafka/listener/ContainerProperties.AckMode.html

    AckMode.RECORD is quite comfortable, since the Kafka record that was passed into your listener method will be automatically be acked if your listener's method implementation completes successfully (no exception is thrown).