The use case is - a pipeline that reads from a changing list of topics, processes them, and writes to a different Kafka topic. So there's currently 150 Kafka topics, but this list can change. There's some API that returns the current set of topics (topics can be added and removed). I want to capture this change every minute (but it happens only a few times a day).
Using Flink, I can use Dynamic Partition Discovery, with an interval of 1 minute - this seems to answer the use case.
But how to change the set of topics I'm subscribed to (without restarting the job ofc)?
val topics = List("a", "b", "c")
KafkaSource.builder[util.HashMap[String, Object]]
.setBootstrapServers(kafkaSourceBootstrapServers)
.setGroupId(kafkaJobConsumerGroup)
.setTopics(topics:_*)
....
.build
The above returns a KafkaSource[T]
, but I want to change the topics
list every minute (basically fetch topics
value from some API).
How can I do so?
Seems like the only API I can use are:
env.fromSource()
env.addSource()
But this will create a different DataStream[T]
, which I already have a streaming running on.
How can I change the topics list while my job is still running? Or It's not possible, and I can't escape a restart.
The only thing that might work would be to use a topic pattern:
KafkaSource.builder().setTopicPattern("topic.*");
But whether that's going to satisfy your requirements depends on whether or not you can arrange to have the topics you want to consume from follow some predictable naming pattern.