Search code examples
apache-kafkaapache-kafka-streamsspring-cloud-streamconfluent-platformconfluent-cloud

Apache Kafka on Confluent Cloud - Incoherent offsets in partitioned topic and consumer lag


I am finding a strange behaviour when using Kafka on Confluent Cloud. I have created a topic with the default partitioning value: 6.

My system consists of a Java Producer application that sends a message to that topic and a Kafka Streams application that reads from it and performs an operation per message.

-----------------------          --------            -----------
| Kafka Java Producer |  ---->  | topic | ---->      | KStream |
-----------------------          --------            -----------

At the moment I am starting only one instance of the Kafka Streams application, so the consumer group has one member.

This is what I've observed:

  1. The producer sends a message and it is recorded in the event topic with offset 0:

enter image description here

  1. The message reaches the KStream, being processed correctly as I can see in the KStream log trace:

KStream

events.foreach { key, value ->
    logger.info("--------> Processing TimeMetric {}", value)
    //Store in DB

Log

[-StreamThread-1] uration$$EnhancerBySpringCGLIB$$e72e3f00 : --------> Processing Event {"...

  1. In the Confluent Cloud consumer lag I can see all consumer groups and their state. There's one for the KStream called events-processor-19549050-d8b0-4b39.... As stated before, this group has only one member (the only instance of the KStream). However, if shows that this group is behind one message in the partition 2. Besides, note that current offset seems to be 1 and end offset 2):

enter image description here

  1. If I send another message in the producer, it is recorded again in the topic but this time with offset 2 instead of 1:

enter image description here

  1. The message reaches the KStream and is processed normally again:

[-StreamThread-1] uration$$EnhancerBySpringCGLIB$$e72e3f00 : --------> Processing Event {

  1. And going back to the consumer lag of the consumer group it is still one message behind, still with some strange offsets (current 3, end 4):

enter image description here

Although the processing seems to be fine, the state showed above doesn't make much sense. Can you explain the reasons why:

  1. the message offsets are incremented +2 instead of +1?
  2. the consumer group seems to be 1 message behind even though it processed the messages correctly?

Solution

  • For the first question, there are two possibilities (although it seems, by reading the 2nd question, that you are using transactions):

    • If you are not using exactly-once semantics, the producer may send more than one message, as there's no control on the wire of what it was sent before. This way, Kafka's default at-least-once semantics may increase your offset number >+1 because of those duplicated messages.

    • If you are using exactly-once semantics, or transactions, each event of a transaction writes a mark into the topic, for internal control purposes. Those marks are responsible for the +2 increase, as they also are stored in the topic (but avoided by the consumer). Confluent's guide to transactions also has some info about this behaviour:

      After the producer initiates a commit (or an abort), the coordinator begins the two phase commit protocol.

      In the first phase, the coordinator updates its internal state to “prepare_commit” and updates this state in the transaction log. Once this is done the transaction is guaranteed to be committed no matter what.

      The coordinator then begins phase 2, where it writes transaction commit markers to the topic-partitions which are part of the transaction.

      These transaction markers are not exposed to applications, but are used by consumers in read_committed mode to filter out messages from aborted transactions and to not return messages which are part of open transactions (i.e., those which are in the log but don’t have a transaction marker associated with them).

      Once the markers are written, the transaction coordinator marks the transaction as “complete” and the producer can start the next transaction.

    Generally speaking, you should not care about the offset number, as it is not a definitive-guide to look. Retries, duplicates or transaction marks, for example, makes the offset differ from what you expect looking at your producer, but you should not worry about it; Your consumers will, and they'll only take care of "real" messages.

    Regarding question 2, this is a known issue: https://issues.apache.org/jira/browse/KAFKA-6607

    Quoting the jira:

    When an input topic for a Kafka Streams application is written using transaction, Kafka Streams does not commit "endOffset" but "endOffset - 1" if it reaches the end of topic. The reason is the commit marker that is the last "message" in the topic; Streams commit "offset of last processed message plus 1" and does not take commit markers into account.

    This is not a correctness issue, but when one inspect the consumer lag via bin/kafka-consumer.group.sh the lag is show as 1 instead of 0 – what is correct from consumer-group tool point of view.

    Hope it helps!