Search code examples
spring-kafkaspring-cloud-streamspring-cloud-stream-binder-kafka

Spring cloud Stream - kafka - Null Acknowledgement Header


I want to manually Commit the offset using spring cloud stream - only when the message processing is successful. Here is my code - application.yml & Handler Class

      public void process(Message<?> message) {
         System.out.println(message.getPayload());
         Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
        if (acknowledgment != null) {
           System.out.println("Acknowledgment provided");
           acknowledgment.acknowledge();
        }
    }
---------------------------------------------------------------------------------
spring:
  application:
      name: springCloud
  cloud:
      stream:
          default-binder: kafka
          kafka:
              bindings:
                  myChannel:
                      consumer:
                          autoCommitOffset: false

But my Acknowledgement object is null as in the header object 'kafka_acknowledgement' itself is NOT present.

  1. How to get the acknowledgment object?
  2. My requirement is to commit the offset ONLY if the processing is successful, if the processing fails I do NOT want to pop the message from the channel so that it can be read later. Will the above code be sufficient to achieve this?

Solution

  • What version are you using?

    In 3.1, autoCommitOffset was deprecated in favor of setting the ackMode (to manual in this case); however, it looks like autoCommitOffset is now completely ignored rather than deprecated.