Search code examples
apache-kafkakafka-consumer-apiapache-kafka-connectdebezium

Kafka consumer - how to recognized offset skipping/missing offsets?


Setup:
We have a Debezium/Kafka Connect setup with an Debezium Oracle producer and a Confluend JDBC consumer/sink.

Starting position / background / problem:
Due to high traffic we have decreased the log.retention.minutes to 1h which is suitable in 99% of the time. But in some rare cases one of the kafka consumers gets a slow down and can't keep up any longer. In that case messages will be deleted in Kafka (due to the aforementioned retention period) before they were picked up and handled by the consumer. In the default config, the consumer then will skip the missing records be choosing the earliest available offset. This leads to inconsistencies on the target side.

Question:
How to handle those situations (if raising the log.retention.minutes isn't an option)?
Note: We would be fine, if the consumer would just throw an exception/stop/etc in case it can't find a message for its given offset.

What we've tried to far...
We tried setting auto.offset.reset to none for the consumer and expected the consumer to stop in case it can't find an offset. In theory this should work. In practice it immeadiately throws an exception when the consumer gets instantiated because there's no first/initial offset.

Final thoughts So is there another config parameter we could use? (Something like "throw exception if offset is missing/skipped, but not on first start"?) Or is there a JMX metric we could monitor in case a consumer is skipping messages?


Solution

  • setting auto.offset.reset to none for the consumer and expected the consumer to stop in case it can't find an offset

    That's what it'll do, yes.

    In practice it immediately throws an exception when the consumer gets instantiated because there's no first/initial offset

    You'll need to actually initialize the group first, then seek it to the earliest offset. E.g. kafka-consumer-offsets --reset-offsets --to-earliest --group connect-<name>

    Something like "throw exception if offset is missing/skipped, but not on first start"?)

    There's nothing to differentiate auto.offset.reset between "first" and "next" starts. But, you could create the connector with consumer.override.auto.offset.reset=earliest, then wait for it to be running, then set it back to none with a PUT /config call. Then repeat whenever it stops running again.

    JMX metric we could monitor in case a consumer is skipping messages

    Not that I know of; the metrics are mostly reporting bytes processed. You'd have to additionally track how many bytes you expect it to read.

    You'd need other monitoring solutions to detect log segments being deleted on the broker, and tracking those offset ranges compared to the offsets your consumer is currently reading.