Search code examples
apache-kafkaapache-kafka-streams

Difference between idempotence and exactly-once in Kafka Stream


I was going through document what I understood we can achieve exactly-once transaction with enabling idempotence=true

idempotence: The Idempotent producer enables exactly once for a producer against a single topic. Basically each single message send has stonger guarantees and will not be duplicated in case there's an error

So if already we have idempotence then why we need another property exactly-once in Kafka Stream? What exactly different between idempotence vs exactly-once

Why exactly-once property not available in normal Kafka Producer?


Solution

  • In a distributed environment failure is a very common scenario that can be happened any time. In the Kafka environment, the broker can crash, network failure, failure in processing, failure while publishing message or failure to consume messages, etc. These different scenarios introduced different kinds of data loss and duplication.

    Failure scenarios

    A(Ack Failed): Producer published message successfully with retry>1 but could not receive acknowledge due to failure. In that case, the Producer will retry the same message that might introduce duplicate.

    enter image description here

    B(Producer process failed in batch messages): Producer sending a batch of messages it failed with few published success. In that case and once the producer will restart it will again republish all messages from the batch which will introduce duplicate in Kafka. enter image description here

    C(Fire & Forget Failed) Producer published message with retry=0(fire and forget). In case of failure published will not aware and send the next message this will cause the message lost. enter image description here

    D(Consumer failed in batch message) A consumer receives a batch of messages from Kafka and manually commit their offset (enable.auto.commit=false). If consumers failed before committing to Kafka, next time Consumers will consume the same records again which reproduce duplicate on the consumer side.

    enter image description here

    Exactly-Once semantics

    In this case, even if a producer tries to resend a message, it leads to the message will be published and consumed by consumers exactly once.

    To achieve Exactly-Once semantic in Kafka, it uses below 3 property

    1. enable.idempotence=true (address a, b & c)
    2. MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=5(Producer will always have one in-flight request per connection)
    3. isolation.level=read_committed (address d )

    Enable Idempotent(enable.idempotence=true)

    Idempotent delivery enables the producer to write a message to Kafka exactly once to a particular partition of a topic during the lifetime of a single producer without data loss and order per partition.

    "Note that enabling idempotence requires MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION to be less than or equal to 5, RETRIES_CONFIG to be greater than 0 and ACKS_CONFIG be 'all'. If these values are not explicitly set by the user, suitable values will be chosen. If incompatible values are set, a ConfigException will be thrown"

    To achieve idempotence Kafka uses a unique id which is called product id or PID and sequence number while producing messages. The producer keeps incrementing the sequence number on each message published which map with unique PID. The broker always compare the current sequence number with the previous one and it rejects if the new one is not +1 greater than the previous one which avoids duplication and same time if more than greater show lost in messages enter image description here

    In a failure scenario broker will compare the sequence numbers with the previous one and if the sequence not increased +1 will reject the message. enter image description here

    Transaction (isolation.level)

    Transactions give us the ability to atomically update data in multiple topic partitions. All the records included in a transaction will be successfully saved, or none of them will be. It allows you to commit your consumer offsets in the same transaction along with the data you have processed, thereby allowing end-to-end exactly-once semantics.

    The producer doesn't wait to write a message to Kafka whereas the Producer uses beginTransaction, commitTransaction, and abortTransaction(in case of failure) Consumer uses isolation.level either read_committed or read_uncommitted

    • read_committed: Consumers will always read committed data only.
    • read_uncommitted: Read all messages in offset order without waiting for transactions to be committed

    If a consumer with isolation.level=read_committed reaches a control message for a transaction that has not completed, it will not deliver any more messages from this partition until the producer commits or aborts the transaction or a transaction timeout occurs. The transaction timeout is determined by the producer using the configuration transaction.timeout.ms(default 1 minute).

    Exactly-Once in Producer & Consumer

    In normal conditions where we have separate producers and consumers. The producer has to idempotent and same time manage transactions so consumers can use isolation.level to read-only read_committed to make the whole process as an atomic operation. This makes a guarantee that the producer will always sync with the source system. Even producer crash or a transaction aborted, it always is consistent and publishes a message or batch of the message as a unit once.

    The same consumer will either receive a message or batch of the message as a unit once.

    In Exactly-Once semantic Producer along with Consumer will appear as atomic operation which will operate as one unit. Either publish and get consumed once at all or aborted.

    Exactly Once in Kafka Stream

    Kafka Stream consumes messages from topic A, process and publish a message to Topic B and once publish use commit(commit mostly run undercover) to flush all state store data to disk.

    Exactly-once in Kafka Stream is a read-process-write pattern that guarantees that this operation will be treated as an atomic operation. Since Kafka Stream caters producer, consumer and transaction all together Kafka Stream comes special parameter processing.guarantee which could exactly_once or at_least_once which make life easy not to handle all parameters separately.

    Kafka Streams atomically updates consumer offsets, local state stores, state store changelog topics, and production to output topics all together. If anyone of these steps fails, all of the changes are rolled back.

    processing.guarantee: exactly_once automatically provide below parameters you no need to set explicitly

    1. isolation.level=read_committed
    2. enable.idempotence=true
    3. MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=5