Search code examples
spring-cloud-streamspring-cloud-dataflow

Manual Acknowledgement (Checkpointing) of Messages: Spring Cloud Stream Kenesis Binder


We are trying to port our Spring Cloud Stream application which consumes messages from Kafka to AWS Kenesis. We require manual acknowledgement for handling certain timeout conditions.

For Kafka we use property autocommitoffset to false and use the ACKNOWLEDGEMENT header to handle the manual acknowledgement.

I went through the documentation for Spring Cloud Stream went through the below: https://dataflow.spring.io/docs/recipes/kinesis/simple-producer-consumer/ https://github.com/spring-cloud/spring-cloud-stream-binder-aws-kinesis/blob/master/spring-cloud-stream-binder-kinesis-docs/src/main/asciidoc/overview.adoc

But could not find any solutions. Any pointers would be very helpful.


Solution

  • After some more searching, found the solution :

    In Kenesis the shard is equivalent to partition and checkpoint to offset

    In Application Yml:

    spring:
      cloud:
        stream:
          kinesis:
            bindings:
              consumer-in-0:
                consumer:
                  checkpointMode: manual
    

    Sample Code for checkpointing

     @Bean
        public Consumer<Message<String>> consume() {
            return message -> {
                System.out.println("message received : "+ message.getPayload());
                System.out.println("message headers : "+ message.getHeaders());
                Checkpointer checkPointer = (Checkpointer) message.getHeaders().get(AwsHeaders.CHECKPOINTER);
                checkPointer.checkpoint();
               
            };
        }
    

    Referred Kenesis Consumer Binder Properties