I am using a pipeline as follows:
inputStream.keyBy(<keyMapper>).
connect(configurationBroadcastStream).
process(new KeyedBroadcastProcessFunction<...>() {
processBroadcastElement(...){...}
processElement(...){...}
}).
keyBy(<keyMapper>). // have to key output of process() again
window(DynamicEventTimeSessionWindow.withDynamicGap(...)).
trigger(new CustomTrigger()).
process(new CustomProcessWindowFn())
In the CustomTrigger()
, I am registering an eventTimeTimer()
that will fire to indicate the end of my window. The problem is that the onEventTime()
method is never invoked, even when:
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
ascendingTimestampExtractor()
, I have sent an event that has definitely pushed the watermark sufficiently far enough that the eventTimeTimer()
should fire. What am I missing? Does it have something to do with missing watermarks and the onTimer()
method of the KeyedBroadcastProcessFunction
? I suspect so because of David Anderson's comment in this
answer:
add special fake watermarks for the non-broadcast stream (set to Watermark.MAX_WATERMARK)
and the fact that I have not implemented a method named onTimer. However, if this indeed is the case, I don't understand how that would be relevant to a downstream Trigger. Thanks.
Edit: a full example of this scenario is here.
Yes, the problem is that the broadcast stream doesn't have watermarks. (But no, it doesn't matter if the KeyedBroadcastProcessFunction
has an onTimer method or not. Once you get the watermark flowing they will flow through to the window regardless.)
Whenever an operator has two or more inputs -- so in your case, when the inputStream
and configurationBroadcastStream
are connected -- the watermark at that operator will the minimum of the watermarks from its inputs. Since the broadcast stream doesn't have watermarks, this is holding back the watermarks provided by the inputStream
.
I have an example showing how you might handle this. Assuming that your broadcast stream doesn't need to have any timing information, you can implement a timestamp extractor and watermark assigner that effectively cedes control of watermarking to the other stream. Something like this:
// Once the two streams are connected, the Watermark of the KeyedBroadcastProcessFunction operator
// will be the minimum of the Watermarks of the two connected streams. Our config stream has a default
// Watermark at Long.MIN_VALUE, and this will hold back the event time clock of the
// KeyedBroadcastProcessFunction, unless we do something about it.
public static class ConfigStreamAssigner implements AssignerWithPeriodicWatermarks<String> {
@Nullable
@Override
public Watermark getCurrentWatermark() {
return Watermark.MAX_WATERMARK;
}
@Override
public long extractTimestamp(String element, long previousElementTimestamp) {
return 0;
}
}