Search code examples
apache-flinkflink-streaming

Apache Flink Watermark behaviour for TwoInputStreamOperator operator


There are 2 data streams with timestamp assigned and watermark generator defined as followed.

val streamA: DataStream[A] = kafkaStreamASourceOutput.assignTimestampsAndWatermarks(
        WatermarkStrategy
          .forBoundedOutOfOrderness[A](Duration.ofSeconds(0))
          .withTimestampAssigner(new SerializableTimestampAssigner[A] {
            override def extractTimestamp(element: A, recordTimestamp: Long): Long = {
              element.lastUpdatedMs
            }
          })
      )

val streamA: DataStream[B] = kafkaStreamBSourceOutput.assignTimestampsAndWatermarks(
        WatermarkStrategy
          .forBoundedOutOfOrderness[B](Duration.ofSeconds(0))
          .withTimestampAssigner(new SerializableTimestampAssigner[B] {
            override def extractTimestamp(element: B, recordTimestamp: Long): Long = {
              element.lastUpdatedMs
            }
          })
      )

When these 2 streams are connected in an operator then the minimum watermark from streamA or streamB acts as the watermark of connecting operator.

class CombineAB extends CoProcessFunction[A, B, C] {
   override def processElement1(elem: A, ctx:Context, out: Collector[C]) {
        out.collect(C(elem.x, elem.y, time.Now()))
   }
   override def processElement2(elem: B, ctx:Context, out: Collector[C]) {
        out.collect(C(elem.x, elem.y, time.Now()))
   }
}

val streamC: DataStream[C] = streamA.connect(streamB)
      .process(new CombineAB)

The watermark of CombineAB operator is the minimum of A or B. Based on that the elements of type C are marked late or not.

But since we have not attached any timestamp assigned to C, does that mean none of the elements from CombineAB operator are marked late? Hence windowing on C will not have any late records being dropped?

Let's say we attach a timestamp assigned and watermark generator to C as follows, then does it mean the watermarks from A and B are completely ignored and watermark of CombineAB depends only on timestamp field of C and the lateness defined with C.

     streamC.assignTimestampsAndWatermarks(
        WatermarkStrategy
          .forBoundedOutOfOrderness[C](Duration.ofSeconds(0))
          .withTimestampAssigner(new SerializableTimestampAssigner[C] {
            override def extractTimestamp(element: C, recordTimestamp: Long): Long = {
              element.updatedTime
            }
          })
      )

Isn't there a way that I can attach the timestamp assigner to C and the watermark for CombineAB is still the minimum of A and B and elements of C are marked late based on C's assigned timestamp and wartermark of CombineAB


Update: Refined implementation of CombineAB


Solution

  • A few points:

    forBoundedOutOfOrderness[A](Duration.ofSeconds(0)) is unusual. Any out-of-order events will be late. Why not use forMonotonousTimestamps()?

    The records produced by CombineAB will have timestamps; there's no need to apply assignTimestampsAndWatermarks to this stream. The timestamp of any records produced by the Collector is the timestamp of the incoming record.

    If you do call assignTimestampsAndWatermarks on stream C, the incoming watermarks will be filtered out, and you'll need to generate new watermarks.