Search code examples
javascalaapache-flink

Timestamp & Watermark assigning for two input streams, later connected for dynamic alerting using 'EventTime'


I have a Flink program which takes in two streams i.e. Data/sensor readings stream and alert rules stream. I am broadcasting rules stream and connecting it to a data stream for generating dynamic alerts. Everything is working fine with ProcessingTime but nothing is working with EventTime. I have assigned timestamp & watermark to my data stream and passing rules stream as it is (as rules stream will have records only when a new additional rule/update is seen). But no alerts are generated.

  1. How can one use 'EventTime' to generate alerts when two streams i.e. one with timestamp & watermarks and one stream with just rules (broadcasted) are connected and processed dynamically according to rule.

  2. Do I necessarily need to assign timestamp and watermark to my rules stream as well?

  3. Since my rules stream will have records only when there is any addition/modification. Is there any workaround or a hack to avoid/overcome this situation?

Any help/suggestion would be grateful.

-- What I tried ! I tried with just one stream i.e. Data stream, by generating alerts with hardcoded window rules. And it is working fine. But when I connect it with rules stream, it fails to generate any alerts/output.

Everything is working fine with 'ProcessingTime' but not with 'EventTime'.

--What I expect! I expect my program to work, when I connect a continuous data stream with a non-continous rules stream, to generate dynamic alerts using 'EventTime'.


Solution

  • This exercise in the Flink training covers exactly this case: https://training.ververica.com/exercises/taxiQuery.html. See the hints and the solution for details, but the approach taken there is to use this timestamp extractor / watermark generator on the stream with the rules:

    // Once the two streams are connected, the Watermark of the KeyedBroadcastProcessFunction operator
    // will be the minimum of the Watermarks of the two connected streams. Our query stream has a default
    // Watermark at Long.MIN_VALUE, and this will hold back the event time clock of the
    // KeyedBroadcastProcessFunction, unless we do something about it.
    public static class QueryStreamAssigner implements AssignerWithPeriodicWatermarks<String> {
        @Nullable
        @Override
        public Watermark getCurrentWatermark() {
            return Watermark.MAX_WATERMARK;
        }
    
        @Override
        public long extractTimestamp(String element, long previousElementTimestamp) {
            return 0;
        }
    }
    

    This has the effect of leaving the other stream completely in charge of the watermarks, which is what is wanted in this case.