Search code examples
apache-kafkaapache-kafka-streamsksqldb

The "first offset" of a topic that backs a ksqlDB stream is always equal to the topic's "last offset"


Dears; I have this ksqlDB app:

CREATE OR REPLACE STREAM DATACHANGES_USER
    WITH (KAFKA_TOPIC='datachanges.user', VALUE_FORMAT='AVRO', KEY_FORMAT='AVRO');

CREATE OR REPLACE STREAM REPARTITIONED_DATACHANGES_USER
    WITH (KAFKA_TOPIC='repartitioned.datachanges.user', VALUE_FORMAT='AVRO', KEY_FORMAT='AVRO')
    AS SELECT u.ROWKEY->ID AS USER_ID, u.BEFORE, U.AFTER
    FROM DATACHANGES_USER u
    PARTITION by u.ROWKEY->ID
EMIT CHANGES;

The first stream, "DATACHANGES_USER", is backed by a topic that is created automatically by the Debezium connector.

The purpose of the second stream, "REPARTITIONED_DATACHANGES_USER", is to re-partition the first one by ID.

The topic that backs the first stream has its "first offset" equal to 0. The first topic's offsets

The topic that backs the second stream has its "first offset" almost equal to the "last offset". The second topic's offsets

How do you explain this behavior? Thank you.


Solution

  • Is the first generated topic a log compacted one? meaning cleanup.policy = compact ? In log compacted topics, the latest event of each key is maintained in the topic, so the events inside Kafka are not consecutive offsets. (You could have offsets 5, 8, 15, 42...) For some reason, it seems that they always keep First Offset = 0 on log compacted topics, or at least that's the behavior I've seen in other places. It could happen also due to an infinity retention policy.

    For the other topic, usually, repartition topics have a default retention policy really short, as they don't need to be maintained in the cluster for a long time. This is because your application is creating the repartition topic, sending events there, and reading them again, so, you don't need to keep the events in the repartition topic for long periods of time as they are exactly the same events that you have in the other topic but with a different key, reducing the retention policy reduces the overhead and storage of the Kafka Cluster.