We are using Spring Cloud Stream for Kafka and looking for Exactly Once Semantics. We have one solution which is working fine as expected 1) Enabling Idempotent & Transaction from producer 2) Using MetaDataStore to check the duplicate message from consumer side with key (offsetId + partitionId + topicName) With the above solution we are not having any message loss & no duplicate processing
But now we found there's one property (producer.sendOffsetsToTransaction
) Kafka API which is helping us to fix the duplicate processing from consumer side without any metadatastore logic. Now am not sure how we can do that with spring cloud stream with this property .sendOffsetsToTransaction
It's handled by the framework automatically if you add a KafkaTransactionManager
to the application context.
You have to add a transaction id prefix to the configuration.
spring.kafka.producer.transaction-id-prefix
and Boot will automatically add a transaction manager.
See producer properties.
spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix
Enables transactions in the binder. See transaction.id in the Kafka documentation and Transactions in the spring-kafka documentation. When transactions are enabled, individual producer properties are ignored and all producers use the spring.cloud.stream.kafka.binder.transaction.producer.* properties.
The listener container sends the offset to the transaction before committing the transaction when the listener exits normally.