Search code examples
flink-streaming

Flink Streaming - How to schedule DataStream to be reprocessed after X minutes?


I have input data stream of events,

while processing them i would like to refer some of the events to be reprocessed again after several minutes.

Is there a way to achieve it?

Here is a simplified example of what im trying to acheive:

var delayedMessagesOutputTag = new OutputTag<Long>("delayedMessagesOutputTag") {};

var inputStream = env.fromElements(1L, 2L, 3L, 4L);
var resolvedElements = processEvents(delayedMessagesOutputTag, inputStream);

var delayedStream = resolvedElements
        .getSideOutput(delayedMessagesOutputTag);
        .process( /*** SOME MAGIC HERE TO CAUSE DELAY ***/);

var resolvedDelayedEvents = processEvents(delayedMessagesOutputTag, delayedStream);

resolvedElements
        .union(resolvedDelayedEvents)
        .addSink(new PrintSinkFunction<>());

env.execute();

....
private SingleOutputStreamOperator<Long> processEvents(OutputTag<Long> delayedMessagesOutputTag, DataStream<Long> inputStream) {
   // Returns resolved events, write desired delayed events to delayedMessagesOutputTag
}

Any advice will be highly appreciated


Solution

  • You can use Flink Timers:

    ...
    final var delay = Duration.ofMinutes(1);
    final var delayedStream = resolvedElements.getSideOutput(delayedMessagesOutputTag)
                    .keyBy(new DelayedProcessFunction.KeySelectorImpl())
                    .process(new DelayedProcessFunction(delay));
    ...
    

    The DelayedProcess and KeySelectorImpl example realization:

    public class DelayedProcessFunction extends KeyedProcessFunction<String, Long, Long> {
        private static final long serialVersionUID = 1L;
    
        private final Long delayMs;
        // we need to store delayed events
        private transient ValueState<Long> state;
    
        public DelayedProcessFunction(Duration delay) {
            delayMs = delay.toMillis();
        }
    
        @Override
        public void open(Configuration parameters) {
            // setup events state
            state = getRuntimeContext().getState(new ValueStateDescriptor<>("state", Long.class));
        }
    
        @Override
        public void processElement(Long value, KeyedProcessFunction<String, Long, Long>.Context ctx, Collector<Long> out) throws Exception {
            final var timerService = ctx.timerService();
            // for example we use registerProcessingTimeTimer() instead of registerEventTimeTimer()
            timerService.registerProcessingTimeTimer(timerService.currentProcessingTime() + delayMs);
            // store current value for onTimer() in the future
            state.update(value);
        }
    
        @Override
        public void onTimer(long timestamp, KeyedProcessFunction<String, Long, Long>.OnTimerContext ctx, Collector<Long> out) throws Exception {
            // pull out stored value for current random key (check getKey() realization below) and send to out dataStream
            out.collect(state.value());
            state.clear();
        }
    
        public static class KeySelectorImpl implements KeySelector<Long, String> {
            private static final long serialVersionUID = 1L;
    
            @Override
            public String getKey(Long value) {
                // random key for every event
                return UUID.randomUUID().toString();
            }
        }
    }
    

    I tested this code locally it works.