Search code examples
apache-kafkajvmapache-kafka-streams

Obtain Kafka 'writeahead' messages with JVM consumer


We have a streams app which makes use of exactly-once semantics, where one topic-partition has stalled. We notice that offsets are going up in increments of two, and understand that the odd-numbered messages are part of the 2-phase commit of Kafka transactions.

We have written a Consumer<Byte[], Byte[]>(using kafka-clients 2.1.0) to dump all of these messages to disk with isolation.level = "read_uncommitted", but it is not fetching these odd-numbered messages. Is there anything we can do to get them?


Solution

  • Control records are not exposed to consumers.

    To "see" them you need to use the DumpLogSegments tool:

    ./kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/mytopic-0/00000000000000000000.log
    

    Control batches will appear like normal batches but they will have the isControl flag set to true.

    baseOffset: 1618 lastOffset: 1618 count: 1 baseSequence: 1 lastSequence: 1 producerId: 1000 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true isControl: false position: 1778601 CreateTime: 1547217145114 isvalid: true size: 1097 magic: 2 compresscodec: NONE crc: 1680083731

    baseOffset: 1619 lastOffset: 1619 count: 1 baseSequence: -1 lastSequence: -1 producerId: 1000 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true isControl: true position: 1779698 CreateTime: 1547217145210 isvalid: true size: 78 magic: 2 compresscodec: NONE crc: 2028573478

    You can also use the --deep-iteration flag to show single record metadata (or even --print-data-log to show actual record data). In that case, you can see if the control batch is a commit or revert:

    offset: 1618 position: 1778601 CreateTime: 1547217145114 isvalid: true keysize: 3 valuesize: 1024 magic: 2 compresscodec: NONE producerId: 1000 producerEpoch: 0 sequence: 1 isTransactional: true headerKeys: []

    offset: 1619 position: 1779698 CreateTime: 1547217145210 isvalid: true keysize: 4 valuesize: 6 magic: 2 compresscodec: NONE producerId: 1000 producerEpoch: 0 sequence: -1 isTransactional: true headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 0