Search code examples
apache-kafkaflink-streaming

Flink offset went to inconsistent state on manually resetting kafka offset


We have a flink stream app which reads msg from kafka. Due to some reason we had to reset kafka offset to latest from kafka reset command as there was huge pile up. We wanted flink app to skip all these msg and start from new message which would come after reset.

The problem is since flink manages its offset internally it is not aware of this reset and it's now reading msg from backward only (offset point before resetting) and now not able to commit offset also. Hence on every restart of flink app it again read from the same point. So we have duplicate msg on every restart.

I understand that we should not reset offset manually in flink kafka app. But how do we recover from this.

I have tried to set auto.offset.config to latest also still it reads those messages again.


Solution

  • Only when Flink is recovering from a failure or being manually restarted from a savepoint or checkpoint will it use the offsets recorded in a checkpoint or savepoint.

    Otherwise, the Flink Kafka consumers will start reading from the consumer group's committed offsets in the Kafka brokers, or from the offsets you have explicitly specified in your code, i.e.,

    myConsumer.setStartFromEarliest();     // start from the earliest record possible
    myConsumer.setStartFromLatest();       // start from the latest record
    myConsumer.setStartFromTimestamp(...); // start from specified epoch timestamp (msecs)
    myConsumer.setStartFromGroupOffsets(); // the default behaviour
    

    I'm not sure how to reconcile these facts with what you have reported.