Search code examples
javaspringspring-bootapache-kafkaspring-kafka

Kafka No Acknowledgment available as an argument


we are trying to implement Kafka Acknowledgment in java spring project. Without the Acknowledgment, we successfully receive and read the message but when we add the Acknowledge in the method we receive this error:

org.springframework.kafka.listener.ListenerExecutionFailedException: invokeHandler Failed; nested exception is java.lang.IllegalStateException: 
No Acknowledgment available as an argument, the listener container must have a MANUAL AckMode to populate the Acknowledgment.;
 nested exception is java.lang.IllegalStateException:
 No Acknowledgment available as an argument, the listener container must have a MANUAL AckMode to populate the Acknowledgment.
 Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from 
 [proto.DeviceModelOuterClass$DevModel] to
 [org.springframework.kafka.support.Acknowledgment] for GenericMessage [payload=sender: "12345678-1234-1234-1234-123456789102"

The way we implement the Acknowledge is like described in the Kafka API:

  @KafkaListener(
      id = Constants.TOPIC_LISTENER,
      topics = "${info.dev.name}",
      autoStartup = "false",
     properties = {
        "value.deserializer=com.cit.iomt.core.DevModelDeserializer",
        "key.deserializer=org.apache.kafka.common.serialization.UUIDDeserializer"
      })
  public void listenToUpdateTopic(@Payload DevModel message, Acknowledgment a) throws Exception {
    LOG.info(Constants.READ_KAFKA_TOPIC, message);
 a.acknowledge();}

And in the properties file we have this:

spring.kafka.listener.ack-mode=manual_immediate
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest

Solution

  • Are you defining a custom KafkaListenerContainerFactory bean by any chance? In my case, I was and in such a case, the spring.kafka.listener.ack-mode=manual_immediate config has no effect. I had to set the ack mode programmatically into the kafkaListenerContainerFactory bean. My container factory bean definition(in Kotlin) ended up looking like below:

        @Bean
        fun kafkaListenerContainerFactory(consumerFactory: ConsumerFactory<Any, Any>): KafkaListenerContainerFactory<KafkaMessageListenerContainer<Any, Any>> {
            var containerFactory = ConcurrentKafkaListenerContainerFactory<Any, Any>()
            containerFactory.consumerFactory = consumerFactory
            containerFactory.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL_IMMEDIATE
    
            return containerFactory as KafkaListenerContainerFactory<KafkaMessageListenerContainer<Any, Any>>
        }