Search code examples
apache-kafka-streams

Kafka streams commit offset semantics


I just wanted to confirm something which i think is in between the line of the documentation. Would it be correct to say that Commit in kafka streams is independent of if the offset/message has been processed by the entire set of processing nodes of application topology, but solely depend on the commit interval ? In other words, where in typical kafka consumer application, one would commit when a message is fully processed as opposed to only fetch, in Kafka stream, simply being fetched is enough for the commit interval to kick in and commit that message/offset ? That is, even if that offset/message has not yet been processed by the entire set of processing node of the application topology ?

Or are message eligible to be committed, based on the fact that the entire set of processing node of the topology processed them, and they are ready to go out in a topic or external system.

In a sense the question could be sum up as, when are offset/messages, eligible to be committed in Kafka streams ? is it conditional ? if so what is the condition ?


Solution

  • You have do understand that a Kafka Streams program, i.e., its Topology may contain multiple sub-topologies (https://docs.confluent.io/current/streams/architecture.html#stream-partitions-and-tasks). Sub-topologies are connected via topics to each other.

    A record can be committed, if it's fully processed by a sub-topology. For this case, the record's intermediate output is written into the topic that connects two sub-topologies before committing happens. The downstream sub-topology would read from the "connecting topic" and commit offsets for this topic.

    Committing indeed happens based on commit.interval.ms only. If a fetch returns lets say 100 records (offsets 0 to 99), and 30 records are processed by the sub-topology when commit.interval.ms hits, Kafka Streams would first make sure that the output of those 30 messages is flushed to Kafka (ie, Producer.flush()) and would afterward commit offset 30 -- the other 70 messages are just in an internal buffer of Kafka Streams and would be processed after the commit. If the buffer is empty, a new fetch would be send. Each thread, tracks commit.interval.ms independently, and would commit all its tasks if commit interval passed.

    Because committing happens on a sub-topology basis, it can be than an input topic record is committed, while the output topic does not have the result data yet, because the intermediate results are not processed yet by a downstream sub-topology.

    You can inspect the structure of your program via Topology#describe() to see what sub-topologies your program has.