Search code examples
javaconfigurationapache-flinkflink-streamingdistributed-system

How can I update a configuration in a Flink transformation?


Given a Flink streaming job which applies a map() operation to the stream.

This map() operation reads its configuration from some properties, and maps data accordingly. For example, the configuration specifies to read attribute "input", and write it using a different attribute name "output" to the stream. This already works fine.

Now the configuration changes, for example the transformation is to use a different attribute name for the output.

Therefore I am looking for a way to let all Flink tasks reread a new configuration at run-time.

Is there a possibility

  • to suspend a KafkaSource
  • wait until the pipeline has drained (flush)
  • trigger all tasks in the cluster to reread a configuration file (coordinated)
  • resume the KafkaSource

programmatically in Flink without redeployment?

In case it matters

  • I am currently using Flink 1.14, but we have to migrate to 1.15 soon.
  • The job uses checkpoints.
  • The job uses KafkaSource, JdbcSink, KafkaSink as provided by Flink.
  • There are additional custom sinks for JDBC and InfluxDB

Solution

  • Normally you would broadcast your configuration stream changes, which means they would be sent to every instance of the operator that is doing the attribute transformation. See https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/broadcast_state/ for an example of applying a Rule to a stream of Shapes, which seems to mimic some of your requirements.