Differnt types of items arrive into a source which I partition them to different window by its 'type'. Now, each type's
window has to be configured with its own emit timeout than having one .byKey("type").window(TumblingProcessingTimeWindows.of(Time.seconds(1))).allowedLateness(Time.seconds(5))
.
As an Example: { {"type-1", 5m}, {"type-2", 10m}, {"type-3", 1m} }
So, elements added to a keyed window of type-1
has the emit time of 5 minutes, similarly the type-2
with 10 minutes and so on.
I can do this by maintaining a state with list of times in KeyedProcessFunction
, registerProcessingTimeTimer
with the timeout configured for the key.
But, instead of having need to manage the state & elements, how do I do this using window
? I tried with custom Trigger
but couldn't clearly make it work.
I post below the code-fix to my custom Trigger
approach. I havent not used the Evictor
as suggested by @David, but used the FIRE_AND_PURGE in onProcessingTime
. This approach seems to work having tested in few scenarios.
Slot configuration
private static final Map<String, Long> eventSlots = new HashMap<>();
static {
eventSlots.put("type22", 30000l);
eventSlots.put("type12", 15000l);
eventSlots.put("type9", 10000l);
eventSlots.put("type2", 20000l);
eventSlots.put("type7", 13000l);
}
processing code
SingleOutputStreamOperator<Event> sourceStream = ...
someSourceStream
.keyBy("type")
.window(GlobalWindows.create())
.trigger(new Trigger<Event, GlobalWindow>() {
@Override
public TriggerResult onElement(Event element, long timestamp, GlobalWindow window,
TriggerContext ctx) throws Exception {
ctx.registerProcessingTimeTimer(ctx.getCurrentProcessingTime() + eventSlots.get(element.getType()));
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx)
throws Exception {
return TriggerResult.FIRE_AND_PURGE;
}
@Override
public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx)
throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {
ctx.deleteProcessingTimeTimer(window.maxTimestamp());
}
}).process(new ProcessWindowFunction<Event, Events, Tuple, Context> () {
@Override
public void process(Tuple arg0,
ProcessWindowFunction<Event, Events, Tuple, GlobalWindow>.Context ctx,
Iterable<Event> arg2, Collector<Event> arg3) throws Exception {
List<Event> events = new ArrayList<>();
arg2.forEach(events::add);
arg3.collect(new Events(events));
});