I use FlinkKafkaConsumer to consume kafka and enable checkpoint. Now I'm a little confused on the offset management and checkpoint mechanism.
I have already know flink will start reading partitions from the consumer group’s.
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-start-position-configuration
and the offset will store into checkpoint in remote fileSystem.
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-and-fault-tolerance
What happen if I stop the application by executing the yarn application -kill appid
and run the start command like ./bin flink run ...
?
Will flink get the offset from checkpoint or from group-id managed by kafka?
If you run the job again without defining a savepoint ($ bin/flink run -s :savepointPath [:runArgs]
) flink will try to get the offsets of your consumer-group from kafka (in older versions from zookeeper). But you will loose all other state of your flink job (which might be ignorable if you have a stateless flink job).
I must admit that this behaviour is quite confusing. By default starting a job without a savepoint is like starting from zero. As far as I know only the implementation of the kafka source differs from that behaviour. If you wanna change that behaviour you can set the setStartFromGroupOffsets
of the FlinkKafkaConsumer[08/09/10]
to false
. This is described here: Kafka Consumers Start Position Configuration
It might be worth having a closer look at the documentation of flink: What is a savepoint and how does it differ from checkpoints.
In a nutshell
Checkpoints:
The primary purpose of Checkpoints is to provide a recovery mechanism in case of unexpected job failures. A Checkpoint’s lifecycle is managed by Flink
Savepoints:
Savepoints are created, owned, and deleted by the user. Their use-case is for planned, manual backup and resume
There are currently ongoing discussions on how to "unify" savepoints and checkpoints. Find a lot of technical details here: Flink improvals 47: Checkpoints vs Savepoints