Search code examples
kafka-consumer-apikafka-producer-apispring-cloud-stream

We are using Spring Cloud Stream for Kafka and we are looking for Exactly Once Semantics with consumer API


We are using Spring Cloud Stream for Kafka and looking for Exactly Once Semantics. We have one solution which is working fine as expected 1) Enabling Idempotent & Transaction from producer 2) Using MetaDataStore to check the duplicate message from consumer side with key (offsetId + partitionId + topicName) With the above solution we are not having any message loss & no duplicate processing

But now we found there's one property (producer.sendOffsetsToTransaction) Kafka API which is helping us to fix the duplicate processing from consumer side without any metadatastore logic. Now am not sure how we can do that with spring cloud stream with this property .sendOffsetsToTransaction


Solution

  • It's handled by the framework automatically if you add a KafkaTransactionManager to the application context.

    You have to add a transaction id prefix to the configuration.

    spring.kafka.producer.transaction-id-prefix

    and Boot will automatically add a transaction manager.

    See producer properties.

    spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix

    Enables transactions in the binder. See transaction.id in the Kafka documentation and Transactions in the spring-kafka documentation. When transactions are enabled, individual producer properties are ignored and all producers use the spring.cloud.stream.kafka.binder.transaction.producer.* properties.

    The listener container sends the offset to the transaction before committing the transaction when the listener exits normally.