From the exactly-once KIP, concerning producer idempotency upon application restart with InitPidRequest
:
2.1 When an TransactionalId is specified If the transactional.id configuration is set, this TransactionalId passed along with the InitPidRequest, and the mapping to the corresponding PID is logged in the transaction log in step 2a. This enables us to return the same PID for the TransactionalId to future instances of the producer, and hence enables recovering or aborting previously incomplete transactions.
In addition to returning the PID, the InitPidRequest performs the following tasks:
Bumps up the epoch of the PID, so that the any previous zombie instance of the producer is fenced off and cannot move forward with its transaction.
Recovers (rolls forward or rolls back) any transaction left incomplete by the previous instance of the producer. The handling of the InitPidRequest is synchronous. Once it returns, the producer can send data and start new transactions.
When the producer fails and is started again and InitPidRequest
is executed, what situations are the last transaction 'rolled forward' (I guess this means committed) or 'rolled back'? How is this controlled?
The key component that enables Kafka to achieve this is the Transaction Coordinator. This was introduced as a part of the KIP that you've mentioned. The transaction coordinator is constructed by the broker as a part of the initialisation process and maintains the following information in memory:
TransactionalId
to assigned PID
, current epoch number (Unix timestamp) and the transaction timeout valuePID
to the current ongoing transaction status of the producer indicated by the PID
, the participant topic-partitions and the last time when this status was updatedNow, to answer your question on rolling forward or back a transaction:
When a producer fails and is restarted, it sends a new InitPidRequest
to the Transaction Coordinator if the producer comes with a non-empty TransactionalId
(supplied as a configuration parameter by the producer application).
The Transaction Coordinator on receiving this request then checks if there is already an entry with the supplied TransactionalId
within the in-memory mapping (Point 1 above). If there exists a mapping, it will look for the PID
in the second in-memory map (Point 2 above) to check if there is any ongoing transaction against that PID
:
BEGIN
, then the transaction will be aborted
(Note: This is the rolled-back version)PREPARE_ABORT
or PREPARE_COMMIT
, then the Transaction Coordinator will wait for the transaction to go through to either COMPLETE_ABORT
(rolled-back version) or COMPLETE_COMMIT
(rolled-forward version).After this, the Transaction Coordinator responds with the latest PID
and the epoch timestamp for the TransactionalId
and the producer can then start sending new transactions.
I've tried to keep the explanation to the minimum but if you're interested in more details then here's the detailed design document for your reference.
I hope this helps!