Search code examples
apache-kafkakafka-producer-api

How does kafka know whether to "roll forward or roll back" a transaction?


From the exactly-once KIP, concerning producer idempotency upon application restart with InitPidRequest:

2.1 When an TransactionalId is specified If the transactional.id configuration is set, this TransactionalId passed along with the InitPidRequest, and the mapping to the corresponding PID is logged in the transaction log in step 2a. This enables us to return the same PID for the TransactionalId to future instances of the producer, and hence enables recovering or aborting previously incomplete transactions.

In addition to returning the PID, the InitPidRequest performs the following tasks:

  1. Bumps up the epoch of the PID, so that the any previous zombie instance of the producer is fenced off and cannot move forward with its transaction.

  2. Recovers (rolls forward or rolls back) any transaction left incomplete by the previous instance of the producer. The handling of the InitPidRequest is synchronous. Once it returns, the producer can send data and start new transactions.

When the producer fails and is started again and InitPidRequest is executed, what situations are the last transaction 'rolled forward' (I guess this means committed) or 'rolled back'? How is this controlled?


Solution

  • The key component that enables Kafka to achieve this is the Transaction Coordinator. This was introduced as a part of the KIP that you've mentioned. The transaction coordinator is constructed by the broker as a part of the initialisation process and maintains the following information in memory:

    1. A map from TransactionalId to assigned PID, current epoch number (Unix timestamp) and the transaction timeout value
    2. A map from PID to the current ongoing transaction status of the producer indicated by the PID, the participant topic-partitions and the last time when this status was updated

    Now, to answer your question on rolling forward or back a transaction:

    When a producer fails and is restarted, it sends a new InitPidRequest to the Transaction Coordinator if the producer comes with a non-empty TransactionalId (supplied as a configuration parameter by the producer application).

    The Transaction Coordinator on receiving this request then checks if there is already an entry with the supplied TransactionalId within the in-memory mapping (Point 1 above). If there exists a mapping, it will look for the PID in the second in-memory map (Point 2 above) to check if there is any ongoing transaction against that PID:

    • If there is an on-going transaction that is in started status i.e. BEGIN, then the transaction will be aborted (Note: This is the rolled-back version)
    • If there is an on-going transaction that has started and is in either PREPARE_ABORT or PREPARE_COMMIT, then the Transaction Coordinator will wait for the transaction to go through to either COMPLETE_ABORT (rolled-back version) or COMPLETE_COMMIT (rolled-forward version).

    After this, the Transaction Coordinator responds with the latest PID and the epoch timestamp for the TransactionalId and the producer can then start sending new transactions.

    I've tried to keep the explanation to the minimum but if you're interested in more details then here's the detailed design document for your reference.

    I hope this helps!