Search code examples
apache-kafkaapache-flinkkafka-consumer-apiflink-streaming

Can new flink Kafka consumer (KafkaSource) start from the old FlinkKafkaConsumer's Savepoint/checkpoint?


I have a job which is running with old flink Kafka consumer ( FlinkKafkaConsumer ) Now I want to migrate it to KafkaSource . But I am not sure what will be the impact of this migration. I want my job to start from the latest successful checkpoint taken by old FlinkKafkaConsumer, Is that possible? If it is not possible then what should be the right way for me to migrate Kafka consumer?


Solution

  • Assuming the same configuration, the two should be able to be used interchangeably as long as your previous group-id configuration for the consumer matches the one used by your earlier implementation. You can use this in conjunction with OffsetsInitializer.latest() to ensure that you continue reading from the same offsets that were previously committed:

    KafkaSource.<YourExampleClass>builder()
       ...
       .setGroupId("your-previous-group-id")
       .setStartingOffsets(OffsetsInitializer.latest())
    

    While the two should just work, it's worth noting your specific pipeline and how it uses parallelism could reveal some of the differences between FlinkKafkaConsumer and the newer KafkaSource:

    the KafkaSource behaves differently than FlinkKafkaConsumer in the case where the number of Kafka partitions is smaller than the parallelism of Flink's Kafka Source operator.