I am somewhat confused by how Flink deals with late elements when watermarking on event time.
My understanding is that as Flink reads a stream of data, the watermark time is progressed upon seeing any data which has a larger event time than that of the current watermark. Then, any windows which cover a time strictly less than the watermark are triggered for eviction (assuming not late allowance.
However, take this minimal example:
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.assigners.{TumblingEventTimeWindows}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector
import org.apache.log4j.{Level, Logger}
object EventTimeExample {
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
case class ExampleType(time: Long, value: Long)
def main(args: Array[String]) {
// Set up environment
val env = StreamExecutionEnvironment.createLocalEnvironment(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// Example S3 path
val simple = env.fromCollection(Seq(
ExampleType(1525132800000L, 1),
ExampleType(1525132800000L, 2) ,
ExampleType(1525132920000L, 3),
ExampleType(1525132800000L, 4)
))
.assignAscendingTimestamps(_.time)
val windows = simple
.windowAll(TumblingEventTimeWindows.of(Time.seconds(60)))
.apply{
(window, iter, collector: Collector[(Long, Long, String)]) => {
collector.collect(window.getStart, window.getEnd, iter.map(_.value).toString())
}
}
windows.print
env.execute("TimeStampExample")
}
}
The result of running this is:
(1525132800000,1525132860000,List(1, 2, 4))
(1525132920000,1525132980000,List(3))
However, if my understanding is correct, the 4
should not be included in the first window here, as the watermark time should be updated when the value 3
record is reached.
Now I recognise this is a trivial example, but not understanding this is making it hard to understand more complicated flows.
Your understanding is basically correct, but there are a few more things going on here that need to be taken into account.
First of all, you've used assignAscendingTimestamps()
, which can only be used when the event stream is perfectly in order (by timestamp), which isn't the case here. You should see this warning when you run this application:
WARN org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor - Timestamp monotony violated: 1525132800000 < 1525132920000
The other factor at work here is that an AscendingTimestampExtractor
does not update the current Watermark for every passing stream element. This is an example of a periodic watermark generator, and it will inject a Watermark
into the stream every n milliseconds, where n is defined by ExecutionConfig.setAutoWatermarkInterval(...)
, which defaults to 200 msec. This is how event #4 sneaks into the first window.
To get the results you expect, you could implement a punctuated watermark generator configured to generate a watermark for every event:
class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[ExampleType] {
override def extractTimestamp(element: ExampleType, previousElementTimestamp: Long): Long = {
element.time
}
override def checkAndGetNextWatermark(lastElement: ExampleType, extractedTimestamp: Long): Watermark = {
new Watermark(extractedTimestamp)
}
}
which you would then use like this:
val simple = env.fromCollection(Seq(
ExampleType(1525132800000L, 1),
ExampleType(1525132800000L, 2) ,
ExampleType(1525132920000L, 3),
ExampleType(1525132800000L, 4)
))
.assignTimestampsAndWatermarks(new PunctuatedAssigner)
Now your example produces these results:
(1525132800000,1525132860000,List(1, 2))
(1525132920000,1525132980000,List(3))
Event #4 has been dropped because it is late. This could be adjusted by relaxing the watermark generator so as to accommodate some amount of out-of-orderness. E.g.,
override def checkAndGetNextWatermark(lastElement: ExampleType, extractedTimestamp: Long): Watermark = {
new Watermark(extractedTimestamp - 200000)
}
which then produces these results:
(1525132800000,1525132860000,List(1, 2, 4))
(1525132920000,1525132980000,List(3))
Or you could configure the windows to allow late events
val windows = simple
.windowAll(TumblingEventTimeWindows.of(Time.seconds(60)))
.allowedLateness(Time.seconds(200))
...
which then causes the first window to fire twice:
(1525132800000,1525132860000,List(1, 2))
(1525132800000,1525132860000,List(1, 2, 4))
(1525132920000,1525132980000,List(3))
Note that since processing Watermarks imposes some overhead, you wouldn't normally want to use punctuated watermarks in this way (with a Watermark for every event). For most applications, periodic watermarks based on a BoundedOutOfOrdernessTimestampExtractor
is a better choice.