Search code examples
apache-kafkaapache-flinkamazon-kinesis

Apache flink: Dynamically change the consumer topic


We are building a flink application which will be deployed to AWS Kinesis data analytics(KDA). This application will consume from Kafka and write to S3. Our setup is as follows:

  1. We have a Kafka bootstrap server (MSK) with several topics.
  2. We are planning to have multiple Flink applications deployed on KDA. All these applications will be part of the same consumer group.

We want to do the following:

  1. Assume we have 10 kafka topics (topic 1 through topic 10).
  2. Assume we have 5 Flink application (app 1 through app 5).
  3. Initially we will assign applications to topics (ex: app 1 will consume from topic 1 and 2, app 2 will consume from topic 3 and 4 and so on).
  4. We will store this in a config system (say CRUD application) and each Flink app when it comes alive, should be able to see which topic it should consume from based on its name. (This part we are able to do).
  5. Assume, suddenly there is a huge surge in the number of messages coming through topic 4 for example. We will update the config system to point App 4 which is consuming from topic 7 and topic 8 to instead consume from topic 7 and topic 4.
  6. We want the Flink app to stop consuming from the old topic and start consuming from the new topic without re-deploying the Flink app. We will have a poller which can inform the Flink app that it should consume from a different topic. The issue is making the Flink app stop consuming from the old topic and start consuming from the new topic without re-deployment.

Is there any way to do this? As far my research goes, the only way to make the Flink app to read from a new topic is to redeploy it. But want to check if there is some way some one has figured out.

Conversely: Will this situation be automatically handled if we make all the 5 Flink applications to listen to all the 10 topics? I mean, if there is a sudden surge in one of the topics, will the flink applications rebalance themselves to dedicate more resources to read from the hot topic since they are all part of the same consumer group?


Solution

  • Flink's Kafka consumer does not support stopping consumption from a topic (without a restart), but it does support dynamic topic and partition discovery. See https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kafka/#dynamic-partition-discovery for details.