Search code examples
apache-flinkflink-sql

How to understand lateness in Flink's event-time over windows


I have following flink sql code (Flink 1.12) that does over window computing.

The StockSource provides following records, each Stock record consists of 3 fields: id,price and create_date

id1,1,2020-09-16T12:50:15
id1,2,2020-09-16T12:50:12
id1,3,2020-09-16T12:50:11
id1,4,2020-09-16T12:50:18
id1,5,2020-09-16T12:50:13
id1,6,2020-09-16T12:50:20
id1,8,2020-09-16T12:50:22
id1,9,2020-09-16T12:50:40

The code is as follows:

import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.api.{AnyWithOperations, FieldExpression}
import org.apache.flink.types.Row
import org.example.model.Stock
import org.example.sources.StockSource


object EventTimeOverWindowSqlTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val ds: DataStream[Stock] = env.addSource(new StockSource())
    val ds2 = ds.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks[Stock] {
      var max = Long.MinValue

      override def checkAndGetNextWatermark(t: Stock, l: Long): Watermark = {
        if (t.trade_date.getTime > max) {
          max = t.trade_date.getTime
        }
        new Watermark(max - 4000) //allow 4 seconds late
      }

      override def extractTimestamp(t: Stock, l: Long): Long = t.trade_date.getTime
    })
    val tenv = StreamTableEnvironment.create(env)
    val table = tenv.fromDataStream(ds2, $"id", $"price", $"rt".rowtime())
    tenv.createTemporaryView("sourceTable", table)
    tenv.from("sourceTable").toAppendStream[Row].print()
    val sql =
      """
      select
        id,
        price,
        sum(price) OVER (PARTITION BY id ORDER BY rt rows between 2 preceding and current row) as sum_price
      from sourceTable
      """.stripMargin(' ')

    val table2 = tenv.sqlQuery(sql)

    table2.toAppendStream[Row].print()

    env.execute()

  }
}

When I run the above application, the output is:

id1,3,3
id1,2,5
id1,5,10
id1,1,8
id1,4,10
id1,6,11
id1,8,18

The price sequence (3 2 5 1 4 6 8 9)in the result is ordered by create_date like select price from sourceTable order by create_date

I got two questions here:

  1. It seems reasonable to explain the result in this way, but, it is not possible to do global ordering since it is a streaming application, new records are coming for ever,there must be some mechanism to do ordering within limited piece of records that Flinks sees and kicks off the ordering and output.

  2. id1,5,2020-09-16T12:50:13 should be late, because of id1,5,2020-09-16T12:50:18(5 seconds late)


Solution

  • The way the implementation works is that it delays processing incoming rows until all earlier rows have been processed. It does this by buffering arriving rows in state.

    "Late" rows are dropped, but the definition of lateness in this operator only includes rows earlier than the latest row that has been fully processed -- and a row is not processed until the watermark reaches the row's rowtime.

    The arrival of id1,4,2020-09-16T12:50:18 advances the watermark to 12:50:14, which triggers the processing of id1,2,2020-09-16T12:50:12. That row at 12:50:12 is still the latest row that has been processed when the row at 12:50:13 arrives, so it is not late (according to this operator's notion of lateness).