Search code examples
apache-kafkaapache-flinkflink-streamingapache-kafka-mirrormaker

Kafka Migration with MM2 and Flink: How to Handle Offset Changes and Savepoints?


I'm currently migrating a Kafka cluster using MirrorMaker 2 (MM2). My plan is to switch my Flink Kafka connector to the new cluster once the migration is complete.

Ideally, I want Flink to resume consumption from where it left off. However, I'm concerned about the offset changes after migration. Since the topic-partition-offset-states information stored in the savepoint will be outdated, how should I handle this situation?

Are there any recommended solutions or best practices for managing Flink savepoints and offset changes during a Kafka migration with MM2? Any advice or insights would be greatly appreciated!

It seems like I need an external program that can:

Load the topic-partition-offset-states information from the Flink savepoint file.

Read the corresponding MM2 topics to translate the old offsets to the new ones in the migrated cluster.

Modify the savepoint file with the translated offsets.

Is this a viable approach? What potential risks or challenges should I be aware of if I go down this path?


Solution

  • I think you should be following exactly the same process as is defined for when you're upgrading your Flink installation to the latest connector version, as is documented at https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kafka/#upgrading-to-the-latest-connector-version, in other words:

    • Make sure you have a group.id configured for your Consumer.
    • Set setCommitOffsetsOnCheckpoints(true) on the consumer so that read offsets are committed to Kafka. It’s important to do this before stopping and taking the savepoint. You might have to do a stop/restart cycle on the old connector version to enable this setting.
    • Set setStartFromGroupOffsets(true) on the consumer so that we get read offsets from Kafka. This will only take effect when there is no read offset in Flink state, which is why the next step is very important.
    • Change the assigned uid of your source/sink. This makes sure the new source/sink doesn’t read state from the old source/sink operators.
    • Start the new job with --allow-non-restored-state because we still have the state of the previous connector version in the savepoint.