Search code examples
apache-flinkflink-streaming

Apache Flink - Update configuration within operator without using Broadcast state


We are using flink to make http calls for every event and this requires certain data that is stored in a db. This data is updated about once a week. This update has to go to the operator.

Is there any way to update this data within the operator without using the Broadcast stream as we are trying to keep the number of streams low in our architecture and also because the changes in the data are in-frequent?


Solution

  • Possible options:

    A) You could simply use a ProcessFunction with a timer and pull for changes every X minutes.

    B) If your state is small and restarts are not too critical: Your server requests robably fails if you do not update your data (403?). Then you could just load the data in open and fail your operator when you receive 403s and recover.

    edit:

    An example to how A) could work. Assuming you have

    Source(Record)->MyAsyncFunc(Output)->Sink(Output)

    I'd go and add another function

    Source(Record)->ConfFetcher(Tuple2(Record, Conf))->MyAsyncFunc(Output)->Sink(Output)

    edit2:

    As you pointed out in the comments a Flink timer is bound to a keyed state. However, for this use case, we don't need to use any Flink timer at all and just use Java Timers.

    private static class PullConfig<T> extends RichMapFunction<T, Tuple2<T, Conf>> {
        private transient ScheduledExecutorService service = Executors.newScheduledThreadPool(1);
        private transient volatile Conf conf;
    
        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
    
            service.scheduleWithFixedDelay(this::pullConfig, 0, 1, TimeUnit.HOURS);
        }
    
        void pullConfig() {
            conf = ...
        }
    
    
        @Override
        public Tuple2<T, Conf> map(T value) throws Exception {
            return new Tuple2(value, conf);
        }
        ...
    }