Search code examples
javaspring-bootapache-kafkaspring-cloudspring-cloud-stream

Auto commit in kafka with spring cloud stream


I have an application where I would like to perform (n)ack in the Kafka messages manually. According to spring cloud documentation, it should be done with autoCommitOffset spring cloud documentation

However, in my application, even defining such property the header KafkaHeaders.ACKNOWLEDGMENT is still coming as null.

Here is what my configuration looks like

spring.cloud.stream.kafka.binder.brokers=${KAFKA_BROKER_LIST}
spring.cloud.stream.default.contentType=application/json
spring.cloud.stream.bindings.mytopic.destination=MyInputTopic
spring.cloud.stream.bindings.mytopic.group=myConsumerGroup
spring.cloud.stream.kafka.bindings.mytopic.consumer.autoCommitOffset=false

And my consumer:

@StreamListener("myTopic")
public void consume(@NotNull @Valid Message<MyTopic> message) {
    MyTopic payload = message.getPayload();
    Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class); // always null
}

I am using java 13 with spring boot 2.2.5.RELEASE and spring cloud Hoxton.SR1

Any help is appreciated.


Solution

  • I found why my consumer was not working as expected:

    In my configuration, I have something like spring.cloud.stream.bindings. mytopic.destination=MyInputTopic, however, the stream binding was done like this:

    @StreamListener("Mytopic")

    Apparently, the configurations prefixed with spring.cloud.stream.bindings are not case sensitive (as all the configurations worked as expected), but the ones prefixed with spring.cloud.stream.kafka.bindings are case sensitive leading to my issue.