Search code examples
apache-flinkflink-streaming

How to join two streaming Flink tables and preserve timestamp information


I have two (streaming) Tables, one with an event time column and one without. I would like to join these using Table API but haven't figured out a way to do this that also preserves the timestamp information.

Consider the following MWE that can be executed in the Scala REPL:

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.{Table, TableEnvironment}
import org.apache.flink.table.api.scala._
import org.apache.flink.types.Row
import org.apache.flink.util.Collector

val streamEnv: StreamExecutionEnvironment = senv
streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val tableEnv = TableEnvironment.getTableEnvironment(streamEnv)

val table1 = streamEnv.addSource(new SourceFunction[(Long, String)] {
  override def run(ctx: SourceFunction.SourceContext[(Long, String)]): Unit = {
    ctx.collectWithTimestamp((1L, "hello"), 1L)
  }
  override def cancel(): Unit = ???
}).toTable(tableEnv, 'ts.rowtime, 'column1)

val table2 = streamEnv.addSource(new SourceFunction[(String, String)] {
  override def run(ctx: SourceFunction.SourceContext[(String, String)]): Unit = {
    ctx.collect(("hello", "world"))
  }
  override def cancel(): Unit = ???
}).toTable(tableEnv, 'column2, 'column3)

def checkTable(table: Table): Unit = {
  table
    .toAppendStream[Row]
    .process(new ProcessFunction[Row, Int] {
      override def processElement(value: Row, ctx: ProcessFunction[Row, Int]#Context, out: Collector[Int]): Unit = {
        out.collect((ctx.timestamp() / 1000).toInt)
      }
    })
  streamEnv.execute()
}

checkTable(table1)

checkTable(table1.join(table2, 'column1 === 'column2).select('column1, 'column2, 'column3))

The first table clearly has event time assigned and hence the first call to checkTable succeeds. (Although it is strange that this only works when explicitly supplying the .rowtime tag when creating the table from the datastream).

Calling checkTable on the join of the first and the second table results in

    Caused by: java.lang.NullPointerException
      at scala.Predef$.Long2long(Predef.scala:363)
      at $anon$1.processElement(<console>:81)
      at $anon$1.processElement(<console>:79)
      at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)

ie ctx.timestamp() is null in the ProcessFunction.processElement. I could enforce timestamps by calling eg .assignAscendingTimestamps(...) on the join result, but I don't think this is safe since I don't know how the join affects the ordering. Is it possible to make this join work and preserve timestamps?


Solution

  • A generic join operator is not able to preserve the event timestamp property because records can be joined in any order.

    A record that is entering the a join operator on the left side might join with a record that was ingested from the right side two days ago. The same could happen the other way round, i.e., the record from the left side being waiting for some time before a matching record from the right side arrives. There are no bounds that allow emit meaningful watermarks. Hence, the event time property of all input records is lost and they can only be treated as regular TIMESTAMP attributes.

    However, you can use a windowed join, i.e., basically an additional join condition to bound the delay between records:

    .where("a = d && ltime >= rtime - 5.minutes && ltime < rtime + 10.minutes")
    

    The intervals can be freely chosen. In this case, Flink infers the bounds on the timestamps and watermarks and is able preserve the event time attributes.