I have a Flink program which takes in two streams i.e. Data/sensor readings stream and alert rules stream. I am broadcasting rules stream and connecting it to a data stream for generating dynamic alerts. Everything is working fine with ProcessingTime but nothing is working with EventTime. I have assigned timestamp & watermark to my data stream and passing rules stream as it is (as rules stream will have records only when a new additional rule/update is seen). But no alerts are generated.
How can one use 'EventTime' to generate alerts when two streams i.e. one with timestamp & watermarks and one stream with just rules (broadcasted) are connected and processed dynamically according to rule.
Do I necessarily need to assign timestamp and watermark to my rules stream as well?
Since my rules stream will have records only when there is any addition/modification. Is there any workaround or a hack to avoid/overcome this situation?
Any help/suggestion would be grateful.
-- What I tried ! I tried with just one stream i.e. Data stream, by generating alerts with hardcoded window rules. And it is working fine. But when I connect it with rules stream, it fails to generate any alerts/output.
Everything is working fine with 'ProcessingTime' but not with 'EventTime'.
--What I expect! I expect my program to work, when I connect a continuous data stream with a non-continous rules stream, to generate dynamic alerts using 'EventTime'.
This exercise in the Flink training covers exactly this case: https://training.ververica.com/exercises/taxiQuery.html. See the hints and the solution for details, but the approach taken there is to use this timestamp extractor / watermark generator on the stream with the rules:
// Once the two streams are connected, the Watermark of the KeyedBroadcastProcessFunction operator
// will be the minimum of the Watermarks of the two connected streams. Our query stream has a default
// Watermark at Long.MIN_VALUE, and this will hold back the event time clock of the
// KeyedBroadcastProcessFunction, unless we do something about it.
public static class QueryStreamAssigner implements AssignerWithPeriodicWatermarks<String> {
@Nullable
@Override
public Watermark getCurrentWatermark() {
return Watermark.MAX_WATERMARK;
}
@Override
public long extractTimestamp(String element, long previousElementTimestamp) {
return 0;
}
}
This has the effect of leaving the other stream completely in charge of the watermarks, which is what is wanted in this case.