Search code examples
apache-kafkaapache-flinkflink-streaming

Changing Flink job topics dynamically mid-job


The use case is - a pipeline that reads from a changing list of topics, processes them, and writes to a different Kafka topic. So there's currently 150 Kafka topics, but this list can change. There's some API that returns the current set of topics (topics can be added and removed). I want to capture this change every minute (but it happens only a few times a day).

Using Flink, I can use Dynamic Partition Discovery, with an interval of 1 minute - this seems to answer the use case.

But how to change the set of topics I'm subscribed to (without restarting the job ofc)?

    val topics = List("a", "b", "c")
    KafkaSource.builder[util.HashMap[String, Object]]
      .setBootstrapServers(kafkaSourceBootstrapServers)
      .setGroupId(kafkaJobConsumerGroup)
      .setTopics(topics:_*)
      ....
      .build

The above returns a KafkaSource[T], but I want to change the topics list every minute (basically fetch topics value from some API).

How can I do so?

Seems like the only API I can use are:

env.fromSource()
env.addSource()

But this will create a different DataStream[T], which I already have a streaming running on.

How can I change the topics list while my job is still running? Or It's not possible, and I can't escape a restart.


Solution

  • The only thing that might work would be to use a topic pattern:

    KafkaSource.builder().setTopicPattern("topic.*");
    

    But whether that's going to satisfy your requirements depends on whether or not you can arrange to have the topics you want to consume from follow some predictable naming pattern.