source.keyBy(0)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.trigger(PurgingTrigger.of(CountTrigger.of[TimeWindow](2)))
.process(new TestFun())
Explanation:
Let's say I have 3 events[E1, E2, E3], which should be trigger by count, and also it should trigger by time. I'm using countTrigger to trigger only 2 events(E1 & E2) but the remaining E3 event is not triggering.
Expected: E3 Event should trigger after the 5 seconds but actually it's triggering only E1 and E2 events
The CountTrigger
you have supplied is replacing the EventTimeTrigger
that would normally be used with a TumblingEventTimeWindow
, rather than somehow extending or augmenting it. To get the behavior you want, you'll have to implement a custom trigger that can FIRE a window based on both counting and timing out.
A google search will find some examples and discussion.