Setup:
We have a Debezium/Kafka Connect setup with an Debezium Oracle producer and a Confluend JDBC consumer/sink.
Starting position / background / problem:
Due to high traffic we have decreased the log.retention.minutes
to 1h
which is suitable in 99% of the time.
But in some rare cases one of the kafka consumers gets a slow down and can't keep up any longer. In that case messages will be deleted in Kafka (due to the aforementioned retention period) before they were picked up and handled by the consumer.
In the default config, the consumer then will skip the missing records be choosing the earliest available offset. This leads to inconsistencies on the target side.
Question:
How to handle those situations (if raising the log.retention.minutes isn't an option)?
Note: We would be fine, if the consumer would just throw an exception/stop/etc in case it can't find a message for its given offset.
What we've tried to far...
We tried setting auto.offset.reset
to none
for the consumer and expected the consumer to stop in case it can't find an offset. In theory this should work. In practice it immeadiately throws an exception when the consumer gets instantiated because there's no first/initial offset.
Final thoughts So is there another config parameter we could use? (Something like "throw exception if offset is missing/skipped, but not on first start"?) Or is there a JMX metric we could monitor in case a consumer is skipping messages?
setting
auto.offset.reset
tonone
for the consumer and expected the consumer to stop in case it can't find an offset
That's what it'll do, yes.
In practice it immediately throws an exception when the consumer gets instantiated because there's no first/initial offset
You'll need to actually initialize the group first, then seek it to the earliest offset. E.g. kafka-consumer-offsets --reset-offsets --to-earliest --group connect-<name>
Something like "throw exception if offset is missing/skipped, but not on first start"?)
There's nothing to differentiate auto.offset.reset
between "first" and "next" starts. But, you could create the connector with consumer.override.auto.offset.reset=earliest
, then wait for it to be running, then set it back to none
with a PUT /config
call. Then repeat whenever it stops running again.
JMX metric we could monitor in case a consumer is skipping messages
Not that I know of; the metrics are mostly reporting bytes processed. You'd have to additionally track how many bytes you expect it to read.
You'd need other monitoring solutions to detect log segments being deleted on the broker, and tracking those offset ranges compared to the offsets your consumer is currently reading.