I am finding a strange behaviour when using Kafka on Confluent Cloud. I have created a topic with the default partitioning value: 6.
My system consists of a Java Producer application that sends a message to that topic and a Kafka Streams application that reads from it and performs an operation per message.
----------------------- -------- -----------
| Kafka Java Producer | ----> | topic | ----> | KStream |
----------------------- -------- -----------
At the moment I am starting only one instance of the Kafka Streams application, so the consumer group has one member.
This is what I've observed:
KStream
events.foreach { key, value ->
logger.info("--------> Processing TimeMetric {}", value)
//Store in DB
Log
[-StreamThread-1] uration$$EnhancerBySpringCGLIB$$e72e3f00 : --------> Processing Event {"...
events-processor-19549050-d8b0-4b39...
. As stated before, this group has only one member (the only instance of the KStream). However, if shows that this group is behind one message in the partition 2. Besides, note that current offset seems to be 1 and end offset 2):[-StreamThread-1] uration$$EnhancerBySpringCGLIB$$e72e3f00 : --------> Processing Event {
Although the processing seems to be fine, the state showed above doesn't make much sense. Can you explain the reasons why:
For the first question, there are two possibilities (although it seems, by reading the 2nd question, that you are using transactions):
If you are not using exactly-once semantics, the producer may send more than one message, as there's no control on the wire of what it was sent before. This way, Kafka's default at-least-once semantics may increase your offset number >+1 because of those duplicated messages.
If you are using exactly-once semantics, or transactions, each event of a transaction writes a mark into the topic, for internal control purposes. Those marks are responsible for the +2 increase, as they also are stored in the topic (but avoided by the consumer). Confluent's guide to transactions also has some info about this behaviour:
After the producer initiates a commit (or an abort), the coordinator begins the two phase commit protocol.
In the first phase, the coordinator updates its internal state to “prepare_commit” and updates this state in the transaction log. Once this is done the transaction is guaranteed to be committed no matter what.
The coordinator then begins phase 2, where it writes transaction commit markers to the topic-partitions which are part of the transaction.
These transaction markers are not exposed to applications, but are used by consumers in read_committed mode to filter out messages from aborted transactions and to not return messages which are part of open transactions (i.e., those which are in the log but don’t have a transaction marker associated with them).
Once the markers are written, the transaction coordinator marks the transaction as “complete” and the producer can start the next transaction.
Generally speaking, you should not care about the offset number, as it is not a definitive-guide to look. Retries, duplicates or transaction marks, for example, makes the offset differ from what you expect looking at your producer, but you should not worry about it; Your consumers will, and they'll only take care of "real" messages.
Regarding question 2, this is a known issue: https://issues.apache.org/jira/browse/KAFKA-6607
Quoting the jira:
When an input topic for a Kafka Streams application is written using transaction, Kafka Streams does not commit "endOffset" but "endOffset - 1" if it reaches the end of topic. The reason is the commit marker that is the last "message" in the topic; Streams commit "offset of last processed message plus 1" and does not take commit markers into account.
This is not a correctness issue, but when one inspect the consumer lag via bin/kafka-consumer.group.sh the lag is show as 1 instead of 0 – what is correct from consumer-group tool point of view.
Hope it helps!