Search code examples
apache-kafkaapache-flink

FLINK: Kafka Source - restart policy when a new topic is discovered at restart


I have a flink job that via a KafkaSource is configured to listen to a regex of topics, something like:

val topicPattern = "^(topic1|topic2|topic3)$"

the Kafka Consumers Start Position Configuration is set to startFromLatest in something like:

val myConsumer = new FlinkKafkaConsumer<>(topicPattern, someProperties);
myConsumer.setStartFromLatest();  

We pass the topicPattern via configuration, and sometimes it happens that a new kafka producer generates data to let's say topic4, we then update the configuration adding this new topic and restart the job using savepoints.

In this case we noticed the kafka source reads this new topic4 from the beginning. Is there anyone able to explain why? Is the Kafka auto.offset.reset property kicking in?


Solution

  • as far as I can see, currently that is how the FlinkKafkaConsumer works, if it is restored from savepoint all topics that were not part of the savepoint will have EARLIEST offset set automatically. This is most probably a bug so I am creating a bug report for that.