I have input data stream of events,
while processing them i would like to refer some of the events to be reprocessed again after several minutes.
Is there a way to achieve it?
Here is a simplified example of what im trying to acheive:
var delayedMessagesOutputTag = new OutputTag<Long>("delayedMessagesOutputTag") {};
var inputStream = env.fromElements(1L, 2L, 3L, 4L);
var resolvedElements = processEvents(delayedMessagesOutputTag, inputStream);
var delayedStream = resolvedElements
.getSideOutput(delayedMessagesOutputTag);
.process( /*** SOME MAGIC HERE TO CAUSE DELAY ***/);
var resolvedDelayedEvents = processEvents(delayedMessagesOutputTag, delayedStream);
resolvedElements
.union(resolvedDelayedEvents)
.addSink(new PrintSinkFunction<>());
env.execute();
....
private SingleOutputStreamOperator<Long> processEvents(OutputTag<Long> delayedMessagesOutputTag, DataStream<Long> inputStream) {
// Returns resolved events, write desired delayed events to delayedMessagesOutputTag
}
Any advice will be highly appreciated
You can use Flink Timers:
...
final var delay = Duration.ofMinutes(1);
final var delayedStream = resolvedElements.getSideOutput(delayedMessagesOutputTag)
.keyBy(new DelayedProcessFunction.KeySelectorImpl())
.process(new DelayedProcessFunction(delay));
...
The DelayedProcess
and KeySelectorImpl
example realization:
public class DelayedProcessFunction extends KeyedProcessFunction<String, Long, Long> {
private static final long serialVersionUID = 1L;
private final Long delayMs;
// we need to store delayed events
private transient ValueState<Long> state;
public DelayedProcessFunction(Duration delay) {
delayMs = delay.toMillis();
}
@Override
public void open(Configuration parameters) {
// setup events state
state = getRuntimeContext().getState(new ValueStateDescriptor<>("state", Long.class));
}
@Override
public void processElement(Long value, KeyedProcessFunction<String, Long, Long>.Context ctx, Collector<Long> out) throws Exception {
final var timerService = ctx.timerService();
// for example we use registerProcessingTimeTimer() instead of registerEventTimeTimer()
timerService.registerProcessingTimeTimer(timerService.currentProcessingTime() + delayMs);
// store current value for onTimer() in the future
state.update(value);
}
@Override
public void onTimer(long timestamp, KeyedProcessFunction<String, Long, Long>.OnTimerContext ctx, Collector<Long> out) throws Exception {
// pull out stored value for current random key (check getKey() realization below) and send to out dataStream
out.collect(state.value());
state.clear();
}
public static class KeySelectorImpl implements KeySelector<Long, String> {
private static final long serialVersionUID = 1L;
@Override
public String getKey(Long value) {
// random key for every event
return UUID.randomUUID().toString();
}
}
}
I tested this code locally it works.