We are trying to port our Spring Cloud Stream application which consumes messages from Kafka to AWS Kenesis. We require manual acknowledgement for handling certain timeout conditions.
For Kafka we use property autocommitoffset
to false and use the ACKNOWLEDGEMENT header to handle the manual acknowledgement.
I went through the documentation for Spring Cloud Stream went through the below: https://dataflow.spring.io/docs/recipes/kinesis/simple-producer-consumer/ https://github.com/spring-cloud/spring-cloud-stream-binder-aws-kinesis/blob/master/spring-cloud-stream-binder-kinesis-docs/src/main/asciidoc/overview.adoc
But could not find any solutions. Any pointers would be very helpful.
After some more searching, found the solution :
In Kenesis the shard
is equivalent to partition
and checkpoint
to offset
In Application Yml:
spring:
cloud:
stream:
kinesis:
bindings:
consumer-in-0:
consumer:
checkpointMode: manual
Sample Code for checkpointing
@Bean
public Consumer<Message<String>> consume() {
return message -> {
System.out.println("message received : "+ message.getPayload());
System.out.println("message headers : "+ message.getHeaders());
Checkpointer checkPointer = (Checkpointer) message.getHeaders().get(AwsHeaders.CHECKPOINTER);
checkPointer.checkpoint();
};
}
Referred Kenesis Consumer Binder Properties