Search code examples
apache-flinkflink-streaming

How to configure flink jobs at runtime?


Is it possible to configure a flink application at runtime? For example, I have a streaming application that reads the input, does some transformations and then filters out all the elements below a certain threshold. However, I want this threshold to be configurable at runtime, meaning I'm able to change this without having to restart my flink job. Example code:

DataStream<MyModel> myModelDataStream = // get input ...
  // do some stuff ...
  .filter(new RichFilterFunction<MyModel>() {
    @Override
    public boolean filter(MyModel value) throws Exception {
      return value.someValue() > someGlobalState.getThreshold();
    }
  })
  
// write to some sink ...
DataStream<MyConfig> myConfigDataStream = // get input ...
  // ...
  .process(new RichProcessFunction<MyConfig>() {
    someGlobalState.setThreshold(MyConfig.getThreshold());
  })

// ...

Is there some possibility to achive this? Like a global state that can be changed through a configuration stream for example.


Solution

  • Yes, you can do this with a BroadcastProcessFunction. Something roughly like this:

    MapStateDescriptor<Void, Threshold> bcStateDescriptor = 
      new MapStateDescriptor<>("thresholds", Types.VOID, Threshold.class);
    
    DataStream<MyModel> myModelDataStream = // get input ...
    DataStream<Threshold> thresholds = // get input...
    BroadcastStream<Threshold> controlStream = thresholds.broadcast(bcStateDescriptor);
    
    DataStream<MyModel> result = myModelDataStream
      .connect(controlStream)
      .process(new MyFunction());
    
    public class MyFunction extends BroadcastProcessFunction<MyModel, Long, MyModel> {    
      @Override
      public void processBroadcastElement(Threshold newthreshold, Context ctx, Collector<MyModel> out) {
        MapStateDescriptor stateDescriptor = new MapStateDescriptor<>("thresholds", Types.VOID, Threshold.class)
        BroadcastState<Void, Threshold> bcState = ctx.getBroadcastState(stateDescriptor);  
        bcState.put(null, newthreshold);
      }
    
      @Override
      public void processElement(MyModel model, Collector<MyModel> out) {
        Threshold threshold = ctx.getBroadcastState(new MapStateDescriptor<>("threshold", Types.VOID, Threshold.class)).get(null);
        if (threshold.value() == null || model.getData() > threshold.value()) {
          out.collect(model);
        }
      }
    }