Search code examples
scalaapache-flinkflink-streaming

Flink windows and late events


I have a use case where I need to handle differently late events vs normal events: if an event arrives after its window has closed, it should be sent to another path.

I thought that .sideOutputLateData(..) would solve this for me. And it does under normal circumstanses (ie with real world data). But if I want to test it, with fabricated data, it ceases to work.

I expect that something like:

val env = StreamExecutionEnvironment.createLocalEnvironment()
env.setParallelism(1)

val events: DataStream[(Int, Long)] = env.fromElements(
  (1, 1),
  (1, 15),
  (1, 25),
  (1, 8) //late Event
)
val lateEvents = OutputTag[(Int, Long)]("lateEvents")

val windowedSum = events
  .assignAscendingTimestamps(e => e._2)
  .windowAll(TumblingEventTimeWindows.of(time.Time.milliseconds(10)))
  .sideOutputLateData(lateEvents)
  .sum(position=0)

val lateEventsStream = windowedSum
  .getSideOutput(lateEvents)
  // Handle differently
  .map(e => (e._1 + 100, e._2))

windowedSum.print()
lateEventsStream.print()
// execute program
env.execute("Flink Scala watermarking test")

Would results in:

[info] (1,1)
[info] (1,15)
[info] (1,25)
[info] (101, 8)

Instead I get:

[info] (2,1)
[info] (1,15)
[info] (1,25)

If I use a socketTextStream as source with the same data, it works as expected.

This tells me that somehow the watermark is not advancing as it should with very fast data inputs. I tried adjusting setAutoWatermarkInterval to a very small value, but with no luck.

Am I missing something? How can I test my job?


Solution

  • Thanks to @Dominik Wosinski for poiting me in the right direction. For anyone else lost in the intricancies of the Flink documentation I'll post here my solution:

    As suspected the problem was that fast input data didn't advance the watermark. This is because by default Flink will check every 200ms if the watermark should advance. You can shorten this delay (increasing the load to the system) using

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.getConfig.setAutoWatermarkInterval(10) // or even lower
    

    Evidently this is not enough for a fast input as a 4 element list.

    The solution is to emit a watermark at each event (note that this is not recommended in a production environment).

    To implement such solution we need to extend the WatermarkGenerator class:

    class MyPunctuatedWatermarkAssigner extends WatermarkGenerator[(Int, Long)] {
    
      override def onEvent(
          event: (Int, Long),
          eventTimestamp: Long,
          output: WatermarkOutput
      ): Unit = {
        // emit at every event
        output.emitWatermark(new Watermark(event._2))
      }
    
      // do nothing at AutoWatermarkInterval
      override def onPeriodicEmit(output: WatermarkOutput): Unit = {}
    }
    

    To assign this generator to a stream, we first need to create a WatermarkStrategy:

    class MyStrategy extends WatermarkStrategy[(Int, Long)] {
      override def createWatermarkGenerator(
          context: WatermarkGeneratorSupplier.Context
      ): WatermarkGenerator[(Int, Long)] = new MyPunctuatedWatermarkAssigner
    }
    

    (This class can also implements an optional createTimestampAssigner method)

    And then we can use it in the stream:

    eventsStream
       .assignTimestampsAndWatermarks(new MyStrategy())