we are trying to implement Kafka Acknowledgment in java spring project. Without the Acknowledgment, we successfully receive and read the message but when we add the Acknowledge in the method we receive this error:
org.springframework.kafka.listener.ListenerExecutionFailedException: invokeHandler Failed; nested exception is java.lang.IllegalStateException:
No Acknowledgment available as an argument, the listener container must have a MANUAL AckMode to populate the Acknowledgment.;
nested exception is java.lang.IllegalStateException:
No Acknowledgment available as an argument, the listener container must have a MANUAL AckMode to populate the Acknowledgment.
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from
[proto.DeviceModelOuterClass$DevModel] to
[org.springframework.kafka.support.Acknowledgment] for GenericMessage [payload=sender: "12345678-1234-1234-1234-123456789102"
The way we implement the Acknowledge is like described in the Kafka API:
@KafkaListener(
id = Constants.TOPIC_LISTENER,
topics = "${info.dev.name}",
autoStartup = "false",
properties = {
"value.deserializer=com.cit.iomt.core.DevModelDeserializer",
"key.deserializer=org.apache.kafka.common.serialization.UUIDDeserializer"
})
public void listenToUpdateTopic(@Payload DevModel message, Acknowledgment a) throws Exception {
LOG.info(Constants.READ_KAFKA_TOPIC, message);
a.acknowledge();}
And in the properties file we have this:
spring.kafka.listener.ack-mode=manual_immediate
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest
Are you defining a custom KafkaListenerContainerFactory
bean by any chance? In my case, I was and in such a case, the spring.kafka.listener.ack-mode=manual_immediate
config has no effect. I had to set the ack mode programmatically into the kafkaListenerContainerFactory
bean. My container factory bean definition(in Kotlin) ended up looking like below:
@Bean
fun kafkaListenerContainerFactory(consumerFactory: ConsumerFactory<Any, Any>): KafkaListenerContainerFactory<KafkaMessageListenerContainer<Any, Any>> {
var containerFactory = ConcurrentKafkaListenerContainerFactory<Any, Any>()
containerFactory.consumerFactory = consumerFactory
containerFactory.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL_IMMEDIATE
return containerFactory as KafkaListenerContainerFactory<KafkaMessageListenerContainer<Any, Any>>
}