Search code examples
javascalaapache-flinkflink-streaming

Flink - When not to inject watermarks? for windowing


I learnt that, Windowing of any stream data uses water marks, as boundaries


In the below code from answer, there is no water mark injected in the generated stream(fromElements())

case class Record( key: String, value: Int )

object Job extends App
{
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  val data = env.fromElements( Record("01",1), Record("02",2), Record("03",3), Record("04",4), Record("05",5) )
  val step1 = data.filter( record => record.value % 3 != 0  ) // introduces some data loss
  val step2 = data.map( r => Record( r.key, r.value * 2 ) )
  val step3 = data.map( r => Record( r.key, r.value * 3 ) )
  val merged = step1.union( step2, step3 )
  val keyed = merged.keyBy(0)
  val windowed = keyed.timeWindow( Time.milliseconds( 50 ) )
  val triggered = windowed.trigger( new CountTriggerWithTimeout( 3, env.getStreamTimeCharacteristic ) )
  val summed = triggered.sum( 1 )
  summed.print()
  env.execute("test")
}


In the below code,

kafka events have been water marked:

env.fromSource(kafkaSource, new CustomWatermarkStrategy(), "Kafka Source With Custom Watermark Strategy")

Question: when to inject watermarks? and when not to


Solution

  • Event time windows always need watermarks.

    In the case you mention with fromElements it appears that there are no watermarks, but actually there are. Whenever a bounded source (such as fromElements) reaches the end of its input, it generates one final watermark with the value MAX_WATERMARK. This causes all pending timers to be triggered, thus closing any open windows.

    This is done to provide the semantics expected for batch jobs, which is that they produce their results after having consumed all of the input.