Search code examples
apache-flinkwatermark

Flink watermark generation


  1. this is a modified version from word count example from the official site 2.event time and listen to a port
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    //listening to the port
    val text = env.socketTextStream("localhost", 9999)
      .assignAscendingTimestamps(item => {
        val line = item.split(" ")
        //simply print timestamp
        println(line.apply(1))
        line.apply(1).toLong*1000 - 3000
      })
  1. do the transformation below
    // the process here
    text.map { each_input =>
    {
      val line = each_input.split(" ")
      (line.apply(0),1,line.apply(1))
    }}
        .process(new SimpleProcessFunc)
        .print()
  1. actually logic from process function not big change
    val mark = context.timerService().currentWatermark()
    val timestamp = context.timestamp()
    //print some infomation
    println(sdf.format(mark) + "===> watermark ===>" + mark)
    println(sdf.format(timestamp) + "===> timestamp in context ===> " + timestamp)
    collector.collect(i)
  1. i use cmd to send data over the socket,but from the ide console, it seems weird that how the watermark generated,seems no logic behind
    1585977022
    03/12/292269055 00:47:04===> watermark ===>-9223372036854775808
    04/04/2020 13:10:19===> timestamp in context ===> 1585977019000
    2> (epoch,1,1585977022)
    1585977034
    04/04/2020 13:10:18===> watermark ===>1585977018999
    04/04/2020 13:10:31===> timestamp in context ===> 1585977031000
    3> (montanin,1,1585977034)
    1585977053
    04/04/2020 13:10:30===> watermark ===>1585977030999
    04/04/2020 13:10:50===> timestamp in context ===> 1585977050000
    4> (song,1,1585977053)

Solution

  • Here's the logic behind the watermark values:

    The initial watermark has the value Long.MIN_VALUE, that's -9223372036854775808.

    It just so happens that watermarks trail behind the stream element whose timestamp was used as the basis for creating the watermark. And watermarks make a statement about the completeness of the stream up through a certain point in time. So the stream element at time 1585977019000 precedes a watermark for time 1585977018999 (since there could still be another stream element for time 1585977019000 after the watermark, it would be wrong for that watermark to have a timestamp of 1585977019000).

    The ascending timestamp watermark generator is a kind of periodic watermark generator, which by default will produce a new watermark every 200 msec -- but only if the watermark has advanced.

    When you access the current watermark in a single input ProcessFunction, you are getting the most recent watermark received by that instance. During the processElement() method, that watermark will not yet reflect whatever the watermark generator learned when it processed the event now being passed to processElement() -- that watermark update will happen later, after the 200 msec timer goes off.

    For more about watermarks, you also see the page on watermarks from the flink training.