I have an application where I would like to perform (n)ack in the Kafka messages manually. According to spring cloud documentation, it should be done with autoCommitOffset
spring cloud documentation
However, in my application, even defining such property the header KafkaHeaders.ACKNOWLEDGMENT
is still coming as null.
Here is what my configuration looks like
spring.cloud.stream.kafka.binder.brokers=${KAFKA_BROKER_LIST}
spring.cloud.stream.default.contentType=application/json
spring.cloud.stream.bindings.mytopic.destination=MyInputTopic
spring.cloud.stream.bindings.mytopic.group=myConsumerGroup
spring.cloud.stream.kafka.bindings.mytopic.consumer.autoCommitOffset=false
And my consumer:
@StreamListener("myTopic")
public void consume(@NotNull @Valid Message<MyTopic> message) {
MyTopic payload = message.getPayload();
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class); // always null
}
I am using java 13 with spring boot 2.2.5.RELEASE and spring cloud Hoxton.SR1
Any help is appreciated.
I found why my consumer was not working as expected:
In my configuration, I have something like spring.cloud.stream.bindings. mytopic.destination=MyInputTopic
, however, the stream binding was done like this:
@StreamListener("Mytopic")
Apparently, the configurations prefixed with spring.cloud.stream.bindings
are not case sensitive (as all the configurations worked as expected), but the ones prefixed with spring.cloud.stream.kafka.bindings
are case sensitive leading to my issue.