Search code examples
apache-kafkaapache-kafka-streams

Kafka Streams commits offset when producer throws an exception


In my Kafka streams application I have a single processor that is scheduled to produce output messages every 60 seconds. Output message is built from messages that come from a single input topic. Sometimes it happens that the output message is bigger than the configured limit on broker (1MB by default). An exception is thrown and the application shuts down. Commit interval is set to default (60s).

In such case I would expect that on the next run all messages that were consumed during those 60s preceding the crash would be re-consumed. But in reality the offset of those messages is committed and the messages are not processed again on the next run.

Reading answers to similar questions it seems to me that the offset should not be committed. When I increase commit interval to 120s (processor still punctuates every 60s) then it works as expected and the offset is not committed.

I am using default processing guarantee but I have also tried exactly_once. Both have the same result. Calling context.commit() from processor seems to have no effect on the issue.

Am I doing something wrong here?


Solution

  • The contract of a Processor in Kafka Streams is, that you have fully processed an input record and forward() all corresponding output messages before process() return. -- This contract implies that Kafka Streams is allowed to commit the corresponding offset after process() returns.

    It seem you "buffer" messages within process() in-memory to emit them later. This violated this contract. If you want to "buffer" messages, you should attach a state store to the Processor and put all those messages into the store (cf https://kafka.apache.org/25/documentation/streams/developer-guide/processor-api.html#state-stores). The store is managed by Kafka Streams for you and it's fault-tolerant. This way, after an error the state will be recovered and you don't loose any data (even if the input messages are not reprocessed).

    I doubt that setting the commit interval to 120 seconds actually works as expected for all cases, because there is no alignment between when a commit happens and when punctuation is called.