I want to manually Commit the offset using spring cloud stream - only when the message processing is successful. Here is my code - application.yml & Handler Class
public void process(Message<?> message) {
System.out.println(message.getPayload());
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
if (acknowledgment != null) {
System.out.println("Acknowledgment provided");
acknowledgment.acknowledge();
}
}
---------------------------------------------------------------------------------
spring:
application:
name: springCloud
cloud:
stream:
default-binder: kafka
kafka:
bindings:
myChannel:
consumer:
autoCommitOffset: false
But my Acknowledgement object is null as in the header object 'kafka_acknowledgement' itself is NOT present.
What version are you using?
In 3.1, autoCommitOffset
was deprecated in favor of setting the ackMode
(to manual
in this case); however, it looks like autoCommitOffset
is now completely ignored rather than deprecated.