Search code examples
apache-flinksliding-windowflink-cepevent-stream-processing

How to trigger a line of code in window based CEP


I have a stream of events and want to count the number of events in a specific period time to find out event loss. My code is something similar to the following:

DataStream<DataEvent> dataStream = ...;

dataStream
.windowAll(SlidingEventTimeWindows.of(Time.seconds(windowSize),Time.seconds(1)))
.process(new MyProcessWindowFunction());

and I defined the MyProcessWindowFunction class as:

public class MyProcessWindowFunction
        extends ProcessAllWindowFunction<DataEvent, String, TimeWindow> {

    @Override
    public void process(ProcessAllWindowFunction<DataEvent, String, TimeWindow>.Context context, Iterable<DataEvent> iterable, Collector<String> collector) throws Exception {
        long count = 0;
        for (DataEvent dataEvent : iterable) {
            count++;
        }
        if ()
        collector.collect("Window: " + context.window() + "count: " + count);
    }
}

My question is how can I use the counted value to compare it and find the event loss. As I understand correctly, this process function will create a stream of strings which is collected by the collector. But, I want I want to do something as soon as I found the event loss at the end of each sliding window.

I appreciate any help. Best regards,


Solution

  • It sounds like you want to do something along these lines:

    datastream
      .map(new Tuple2<>(event.sensorId, 1))
      .keyBy(t -> t.f0)
      .window(SlidingEventTimeWindows.of(Time.seconds(windowSize),Time.seconds(1)))
      .reduce(t1, t2 -> new Tuple2<>(t1.f0, t1.f1 + t2.f1)) 
      .filter(t -> new SensorShouldBeStopped(t))
      .addSink(...);