env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//listening to the port
val text = env.socketTextStream("localhost", 9999)
.assignAscendingTimestamps(item => {
val line = item.split(" ")
//simply print timestamp
println(line.apply(1))
line.apply(1).toLong*1000 - 3000
})
// the process here
text.map { each_input =>
{
val line = each_input.split(" ")
(line.apply(0),1,line.apply(1))
}}
.process(new SimpleProcessFunc)
.print()
val mark = context.timerService().currentWatermark()
val timestamp = context.timestamp()
//print some infomation
println(sdf.format(mark) + "===> watermark ===>" + mark)
println(sdf.format(timestamp) + "===> timestamp in context ===> " + timestamp)
collector.collect(i)
1585977022
03/12/292269055 00:47:04===> watermark ===>-9223372036854775808
04/04/2020 13:10:19===> timestamp in context ===> 1585977019000
2> (epoch,1,1585977022)
1585977034
04/04/2020 13:10:18===> watermark ===>1585977018999
04/04/2020 13:10:31===> timestamp in context ===> 1585977031000
3> (montanin,1,1585977034)
1585977053
04/04/2020 13:10:30===> watermark ===>1585977030999
04/04/2020 13:10:50===> timestamp in context ===> 1585977050000
4> (song,1,1585977053)
Here's the logic behind the watermark values:
The initial watermark has the value Long.MIN_VALUE, that's -9223372036854775808.
It just so happens that watermarks trail behind the stream element whose timestamp was used as the basis for creating the watermark. And watermarks make a statement about the completeness of the stream up through a certain point in time. So the stream element at time 1585977019000 precedes a watermark for time 1585977018999 (since there could still be another stream element for time 1585977019000 after the watermark, it would be wrong for that watermark to have a timestamp of 1585977019000).
The ascending timestamp watermark generator is a kind of periodic watermark generator, which by default will produce a new watermark every 200 msec -- but only if the watermark has advanced.
When you access the current watermark in a single input ProcessFunction
, you are getting the most recent watermark received by that instance. During the processElement()
method, that watermark will not yet reflect whatever the watermark generator learned when it processed the event now being passed to processElement()
-- that watermark update will happen later, after the 200 msec timer goes off.
For more about watermarks, you also see the page on watermarks from the flink training.