Search code examples
apache-flinkflink-streamingflink-cepflink-sqlflink-batch

Flink combination of windowByTime and triggerByCount


source.keyBy(0)
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .trigger(PurgingTrigger.of(CountTrigger.of[TimeWindow](2)))
    .process(new TestFun())

Explanation:

Let's say I have 3 events[E1, E2, E3], which should be trigger by count, and also it should trigger by time. I'm using countTrigger to trigger only 2 events(E1 & E2) but the remaining E3 event is not triggering.

Expected: E3 Event should trigger after the 5 seconds but actually it's triggering only E1 and E2 events


Solution

  • The CountTrigger you have supplied is replacing the EventTimeTrigger that would normally be used with a TumblingEventTimeWindow, rather than somehow extending or augmenting it. To get the behavior you want, you'll have to implement a custom trigger that can FIRE a window based on both counting and timing out.

    A google search will find some examples and discussion.