Search code examples
apache-flink

How to get last value before current Window during Flink reduce calculation


I'm using Flink 1.6 timeWindow. Assuming I have following data point:

Timestamp, Value

12:55:00, 10

13:05:00, 12

13:30:00, 14

13:59:00, 13

14:02:00, 17

Now, my timewindow is 13:00(inclusive) ~ 14:00 (exclusive), how can I get the last value, which is 12:55:00, 10, as an initial value for current Window calculation?

Thanks.


Solution

  • Broadly speaking, there are two ways you can approach this:

    1. some sort of custom windowing
    2. use a ProcessFunction instead

    Given the way that the window API is designed, I don't see any way to accomplish this while using pre-aggregation (i.e., reduce or aggregate), but I think you could do something with a ProcessWindowFunction and a custom Evictor, or by leveraging the globalState in ProcessWindowFunction.Context to keep state between one window and the next.

    One issue you might run into is that if no events fall into the hour for 13:00 ~ 14:00, for example, then no window will be created or evaluated. If that's a problem, then further customization is needed.

    In most cases involving non-standard windowing, it's more straightforward and more performant to use a ProcessFunction. That way you have direct control over what state to keep, and the window triggering logic generally isn't very difficult to recreate.