I learnt that, Windowing of any stream data uses water marks, as boundaries
In the below code from answer, there is no water mark injected in the generated stream(fromElements()
)
case class Record( key: String, value: Int )
object Job extends App
{
val env = StreamExecutionEnvironment.getExecutionEnvironment
val data = env.fromElements( Record("01",1), Record("02",2), Record("03",3), Record("04",4), Record("05",5) )
val step1 = data.filter( record => record.value % 3 != 0 ) // introduces some data loss
val step2 = data.map( r => Record( r.key, r.value * 2 ) )
val step3 = data.map( r => Record( r.key, r.value * 3 ) )
val merged = step1.union( step2, step3 )
val keyed = merged.keyBy(0)
val windowed = keyed.timeWindow( Time.milliseconds( 50 ) )
val triggered = windowed.trigger( new CountTriggerWithTimeout( 3, env.getStreamTimeCharacteristic ) )
val summed = triggered.sum( 1 )
summed.print()
env.execute("test")
}
In the below code,
kafka events have been water marked:
env.fromSource(kafkaSource, new CustomWatermarkStrategy(), "Kafka Source With Custom Watermark Strategy")
Question: when to inject watermarks? and when not to
Event time windows always need watermarks.
In the case you mention with fromElements
it appears that there are no watermarks, but actually there are. Whenever a bounded source (such as fromElements
) reaches the end of its input, it generates one final watermark with the value MAX_WATERMARK. This causes all pending timers to be triggered, thus closing any open windows.
This is done to provide the semantics expected for batch jobs, which is that they produce their results after having consumed all of the input.