Search code examples
apache-kafkakafka-producer-apispring-kafka

Confused about preventing duplicates with new Kafka idempotent producer API


My app has 5+ consumers consuming off of five partitions on a kafka topic.(using kafka version 11) My consumer's each produce a message to another topic then save some state to the database, then do a manual_ immediate acknowledgement and move onto the next message.

I'm trying to solve the scenario when they emit successful to the outbound topic. then we have a failure/lose the consumer. When another consumer takes over the partition it will emit ANOTHER message to the outbound topic. This is bad :(

I discovered that kafka now has idempotent producers but from what I read it only guarantees for a producers session.

"When producer restarts, new PID gets assigned. So the idempotency is promised only for a single producer session" - (blog) - https://hevodata.com/blog/kafka-exactly-once

This seems largely useless to me. In my use-case the whole point is when I replay a message on another consumer it does not duplicate the outbound message.

Is there something i'm missing?


Solution

  • When using transactions, you shouldn't use ANY consumer-based mechanism, manual or otherwise, to commit the offsets.

    Instead, you use the producer to send the offsets to the transaction so the offset commit is part of the transaction.

    If configured with a KafkaTransactionManager, or ChainedKafkaTransactionManager the Spring listener container will send the offsets to the transaction when the listener exits normally.

    If you don't use a Kafka transaction manager, you need to use the KafkaTemplate (or Producer if you are using the native APIs) to send the offsets to the transaction.

    Using the consumer to commit the offset is not part of the transaction, so things will not work as expected.

    When using a transaction manager, the listener container binds the Producer to the thread so any downstream KafkaTemplate operations participate in the transaction that the consumer starts. See the documentation.