Search code examples
apache-kafkaapache-flinkcheckpoint

Will flink resume from the last offset after executing yarn application kill and running again?


I use FlinkKafkaConsumer to consume kafka and enable checkpoint. Now I'm a little confused on the offset management and checkpoint mechanism. I have already know flink will start reading partitions from the consumer group’s. https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-start-position-configuration and the offset will store into checkpoint in remote fileSystem. https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-and-fault-tolerance

What happen if I stop the application by executing the yarn application -kill appid and run the start command like ./bin flink run ...? Will flink get the offset from checkpoint or from group-id managed by kafka?


Solution

  • If you run the job again without defining a savepoint ($ bin/flink run -s :savepointPath [:runArgs]) flink will try to get the offsets of your consumer-group from kafka (in older versions from zookeeper). But you will loose all other state of your flink job (which might be ignorable if you have a stateless flink job).

    I must admit that this behaviour is quite confusing. By default starting a job without a savepoint is like starting from zero. As far as I know only the implementation of the kafka source differs from that behaviour. If you wanna change that behaviour you can set the setStartFromGroupOffsets of the FlinkKafkaConsumer[08/09/10] to false. This is described here: Kafka Consumers Start Position Configuration

    It might be worth having a closer look at the documentation of flink: What is a savepoint and how does it differ from checkpoints.

    In a nutshell

    Checkpoints:

    The primary purpose of Checkpoints is to provide a recovery mechanism in case of unexpected job failures. A Checkpoint’s lifecycle is managed by Flink

    Savepoints:

    Savepoints are created, owned, and deleted by the user. Their use-case is for planned, manual backup and resume

    There are currently ongoing discussions on how to "unify" savepoints and checkpoints. Find a lot of technical details here: Flink improvals 47: Checkpoints vs Savepoints