Search code examples
triggerswindowapache-flinkflink-streamingearly-return

Early firing in Flink - how to emit early window results to a different DataStream with a trigger


I'm working with code that uses a tumbling window of one day, and would like to send early results to a different DataStream on an hourly basis. I understand that triggers are a way to go here, but don't really see how it would work.

The current code is as follows:

myStream
     .keyBy(..)
     .window(TumblingEventTimeWindows.of(Time.days(1)))
     .aggregate(new MyAggregateFunction(), new MyProcessWindowFunction())

In my understanding, I should register a trigger, and then on its onEventTime method get a hold of a TriggerContext and I can send data to the labeled output from there. But how do I get the current state of MyAggregateFunction from there? Or would I need to my own computation here inside of onEventTime()?

Also, the documentation states that "By specifying a trigger using trigger() you are overwriting the default trigger of a WindowAssigner.". Would my one day window then still fire correctly, or do I need to trigger it somehow differently?

Another way of doing this is creating two different operators - one that windows by 1 hour, and another that windows by 1 day. Would triggers be a preferred approach to that?


Solution

  • Normally when I get to more complex behavior like this, I use a KeyedProcessFunction. You can aggregate (and save in state) hourly and daily results, set timers as needed, and use a side output for the hourly results versus the regular output for the daily results.