Search code examples
apache-flinkstream-processingwindowing

How to gather late data in Flink Stream Processing Windowing


Consider I have a data stream that contains event time data in it. I want to gather input data stream in window time of 8 milliseconds and reduce every window data. I do that using the following code:

aggregatedTuple
          .keyBy( 0).timeWindow(Time.milliseconds(8))
          .reduce(new ReduceFunction<Tuple2<Long, JSONObject>>()

Point: The key of the data stream is the timestamp of processing time mapped to last 8 submultiples of a timestamp of processing millisecond, for example 1531569851297 will mapped to 1531569851296.

But it's possible the data stream arrived late and enter to the wrong window time. For example, suppose I set the window time to 8 milliseconds. If data enter the Flink engine in order or at least with a delay less than window time (8 milliseconds) it will be the best case. But suppose data stream event time (that is a field in the data stream, also) has arrived with the latency of 30 milliseconds. So it will enter the wrong window and I think if I check the event time of every data stream, as it wants to enter the window, I can filter at such a late data. So I have two question:

  • How can I filter data stream as it wants to enter the window and check if the data created at the right timestamp for the window?
  • How can I gather such late data in a variable to do some processing on them?

Solution

  • Flink has two different, related abstractions that deal with different aspects of computing windowed analytics on streams with event-time timestamps: watermarks and allowed lateness.

    First, watermarks, which come into play whenever working with event-time data (whether or not you are using windows). Watermarks provide information to Flink about the progress of event-time, and give you, the application writer, a means of coping with out-of-order data. Watermarks flow with the data stream, and each one marks a position in the stream and carries a timestamp. A watermark serves as an assertion that at that point in the stream, the stream is now (probably) complete up to that timestamp -- or in other words, the events that follow the watermark are unlikely to be from before the time indicated by the watermark. The most common watermarking strategy is to use a BoundedOutOfOrdernessTimestampExtractor, which assumes that events arrive within some fixed, bounded delay.

    This now provides a definition of lateness -- events that follow a watermark with timestamps less than the watermarks' timestamp are considered late.

    The window API provides a notion of allowed lateness, which is set to zero by default. If allowed lateness is greater than zero, then the default Trigger for event-time windows will accept late events into their appropriate windows, up to the limit of the allowed lateness. The window action will fire once at the usual time, and then again for each late event, up to the end of the allowed lateness interval. After which, late events are discarded (or collected to a side output if one is configured).

    How can I filter data stream as it wants to enter the window and check 
    if the data created at the right timestamp for the window?
    

    Flink's window assigners are responsible for assigning events to the appropriate windows -- the right thing will happen automatically. New window instances will be created as needed.

    How can I gather such late data in a variable to do some processing on them?
    

    You can either be sufficiently generous in your watermarking so as to avoid having any late data, and/or configure the allowed lateness to be long enough to accommodate the late events. Be aware, however, that Flink will be forced to keep all windows open that are still accepting late events, which will delay garbage collecting old windows and may consume considerable memory.

    Note that this discussion assumes you want to work with time windows -- e.g. the 8msec long windows you are working with. Flink also supports count windows (e.g. group events into batches of 100), session windows, and custom window logic. Watermarks and lateness don't play any role if you are using count windows, for example.

    If you want per-key results for your analytics, then use keyBy to partition the stream by key (e.g., by userId) before applying windowing. For example

    stream
      .keyBy(e -> e.userId)
      .timeWindow(Time.seconds(10))
      .reduce(...)
    

    will produce separate results for each userId.

    Update: Note that in recent versions of Flink it is now possible for windows to collect late events to a side output.

    Some relevant documentation:

    Event Time and Watermarks
    Allowed Lateness