There are 2 data streams with timestamp assigned and watermark generator defined as followed.
val streamA: DataStream[A] = kafkaStreamASourceOutput.assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness[A](Duration.ofSeconds(0))
.withTimestampAssigner(new SerializableTimestampAssigner[A] {
override def extractTimestamp(element: A, recordTimestamp: Long): Long = {
element.lastUpdatedMs
}
})
)
val streamA: DataStream[B] = kafkaStreamBSourceOutput.assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness[B](Duration.ofSeconds(0))
.withTimestampAssigner(new SerializableTimestampAssigner[B] {
override def extractTimestamp(element: B, recordTimestamp: Long): Long = {
element.lastUpdatedMs
}
})
)
When these 2 streams are connected in an operator then the minimum watermark from streamA or streamB acts as the watermark of connecting operator.
class CombineAB extends CoProcessFunction[A, B, C] {
override def processElement1(elem: A, ctx:Context, out: Collector[C]) {
out.collect(C(elem.x, elem.y, time.Now()))
}
override def processElement2(elem: B, ctx:Context, out: Collector[C]) {
out.collect(C(elem.x, elem.y, time.Now()))
}
}
val streamC: DataStream[C] = streamA.connect(streamB)
.process(new CombineAB)
The watermark of CombineAB
operator is the minimum of A
or B
. Based on that the elements of type C
are marked late or not.
But since we have not attached any timestamp assigned to C
, does that mean none of the elements from CombineAB
operator are marked late? Hence windowing on C will not have any late records being dropped?
Let's say we attach a timestamp assigned and watermark generator to C as follows, then does it mean the watermarks from A and B are completely ignored and watermark of CombineAB
depends only on timestamp field of C and the lateness defined with C.
streamC.assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness[C](Duration.ofSeconds(0))
.withTimestampAssigner(new SerializableTimestampAssigner[C] {
override def extractTimestamp(element: C, recordTimestamp: Long): Long = {
element.updatedTime
}
})
)
Isn't there a way that I can attach the timestamp assigner to C and the watermark for CombineAB
is still the minimum of A
and B
and elements of C are marked late based on C's assigned timestamp and wartermark of CombineAB
Update: Refined implementation of CombineAB
A few points:
forBoundedOutOfOrderness[A](Duration.ofSeconds(0))
is unusual. Any out-of-order events will be late. Why not use forMonotonousTimestamps()
?
The records produced by CombineAB
will have timestamps; there's no need to apply assignTimestampsAndWatermarks
to this stream. The timestamp of any records produced by the Collector
is the timestamp of the incoming record.
If you do call assignTimestampsAndWatermarks
on stream C, the incoming watermarks will be filtered out, and you'll need to generate new watermarks.