I am testing event-time and watermark in Flink. Below is my code.
object WatermarkTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val properties = new Properties()
properties.setProperty("bootstrap.servers", "127.0.0.1:9092")
properties.setProperty("group.id", "enven-test")
env.getConfig.setAutoWatermarkInterval(1L)
val input = env.addSource(new FlinkKafkaConsumer011[String]("event-time-topic", new SimpleStringSchema(), properties))
val inputMap = input.map(f=> {
val arr = f.split(",")
val code = arr(0)
val time = arr(1).toLong
MyEvent(code, time)
})
val watermark = inputMap.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessGenerator())
val window = watermark
.keyBy(_.code)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.apply(new WindowFunctionTest)
window.print()
env.execute()
}
class WindowFunctionTest extends WindowFunction[MyEvent,(String, Int,String,String,String,String),String,TimeWindow]{
override def apply(key: String, window: TimeWindow, input: Iterable[MyEvent], out: Collector[(String, Int,String,String,String,String)]): Unit = {
val list = input.toList.sortBy(_.time)
val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
out.collect(key,input.size,format.format(list.head.time),format.format(list.last.time),format.format(window.getStart),format.format(window.getEnd))
}
}
}
And below is the event-time and watermark generator:
class BoundedOutOfOrdernessGenerator extends
AssignerWithPeriodicWatermarks[MyEvent] {
val maxOutOfOrderness = 10000L
var currentMaxTimestamp: Long = _
val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
var watermark: Watermark = null
var timestamp: Long = _
override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
timestamp = element.time
currentMaxTimestamp = if (timestamp > currentMaxTimestamp) timestamp else currentMaxTimestamp
println("timestamp:" + element.code +","+ element.time + "|" +format.format(element.time) +", currentMaxTimestamp: "+ currentMaxTimestamp + "|"+ format.format(currentMaxTimestamp) + ", watermark: "+ format.format(watermark.getTimestamp))
timestamp;
}
override def getCurrentWatermark(): Watermark = {
watermark = new Watermark((currentMaxTimestamp - maxOutOfOrderness)/1000*1000);
watermark
}
}
Here is some test data. In my opinion, the first calculation should after watermark: 2016-04-27 19:34:25.000. And the test result shows the calculation triggered after watermark watermark: 2016-04-27 19:34:24.000. Can anybody explain this?
I suggest you print the watermark in getCurrentWatermark, as well as in extractTimestamp. That should clarify what's happening.
The issue is that extractTimestamp is being called to extract the timestamp from the event with a timestamp at 19:34:35 -- the event that will cause the current watermark to advance to 19:34:25, thus triggering the window -- and you are printing the current watermark at this point. At the time the println in extractTimestamp is executed, the watermark hasn't yet been advanced to reflect this new event. But soon after extractTimestamp returns, getCurrentWatermark will be called, and this will advance the current watermark to 19:34:25, which will in turn trigger the window.