Search code examples
apache-flinkcomplex-event-processingesperflink-cep

How to create batch or slide windows using Flink CEP?


I'm just starting with Flink CEP and I come from Esper CEP engine. As you may (or not) know, in Esper using their syntax (EPL) you can create a batch or slide window easily, grouping the events in those windows and allowing you to use this events with functions (avg, max, min, ...).

For example, with the following pattern you can create a batch windows of 5 seconds and calculate the average value of the attribute price of all the Stock events that you have received in that specified window.

select avg(price) from Stock#time_batch(5 sec)

The thing is I would like to know how to implement this on Flink CEP. I'm aware that, probably, the goal or approach in Flink CEP is different, so the way to implement this may not be as simple as in Esper CEP.

I have taken a look at the docs regarding to time windows, but I'm not able to implement this windows along with Flink CEP. So, given the following code:

DataStream<Stock> stream = ...; // Consume events from Kafka

// Filtering events with negative price
Pattern<Stock, ?> pattern = Pattern.<Stock>begin("start")
            .where(
                    new SimpleCondition<Stock>() {
                        public boolean filter(Stock event) {
                            return event.getPrice() >= 0;
                        }
                    }
            );

PatternStream<Stock> patternStream = CEP.pattern(stream, pattern);

/**
  CREATE A BATCH WINDOW OF 5 SECONDS IN WHICH
  I COMPUTE OVER THE AVERAGE PRICES AND, IF IT IS
  GREATER THAN A THREESHOLD, AN ALERT IS DETECTED

  return avg(allEventsInWindow.getPrice()) > 1;
*/  

DataStream<Alert> result = patternStream.select(
            new PatternSelectFunction<Stock, Alert>() {
                @Override
                public Alert select(Map<String, List<Stock>> pattern) throws Exception {
                    return new Alert(pattern.toString());
                }
            }
    );

How can I create that window in which, from the first one received, I start to calculate the average for the following events within 5 seconds. For example:

t = 0 seconds 
Stock(price = 1); (...starting batch window...)
Stock(price = 1);
Stock(price = 1);
Stock(price = 2);
Stock(price = 2);
Stock(price = 2);
t = 5 seconds     (...end of batch window...)
Avg = 1.5 => Alert detected!

The average after 5 seconds would be 1.5, and will trigger the alert. How can I code this?

Thanks!


Solution

  • With Flink's CEP library this behavior is not expressible. I would rather recommend using Flink's DataStream or Table API to calculate the averages. Based on that you could again use CEP to generate other events.

    final DataStream<Stock> input = env
        .fromElements(
                new Stock(1L, 1.0),
                new Stock(2L, 2.0),
                new Stock(3L, 1.0),
                new Stock(4L, 2.0))
        .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Stock>(Time.seconds(0L)) {
            @Override
            public long extractTimestamp(Stock element) {
                return element.getTimestamp();
            }
        });
    
    final DataStream<Double> windowAggregation = input
        .timeWindowAll(Time.milliseconds(2))
        .aggregate(new AggregateFunction<Stock, Tuple2<Integer, Double>, Double>() {
            @Override
            public Tuple2<Integer, Double> createAccumulator() {
                return Tuple2.of(0, 0.0);
            }
    
            @Override
            public Tuple2<Integer, Double> add(Stock value, Tuple2<Integer, Double> accumulator) {
                return Tuple2.of(accumulator.f0 + 1, accumulator.f1 + value.getValue());
            }
    
            @Override
            public Double getResult(Tuple2<Integer, Double> accumulator) {
                return accumulator.f1 / accumulator.f0;
            }
    
            @Override
            public Tuple2<Integer, Double> merge(Tuple2<Integer, Double> a, Tuple2<Integer, Double> b) {
                return Tuple2.of(a.f0 + b.f0, a.f1 + b.f1);
            }
        });
    
    final DataStream<Double> result = windowAggregation.filter((FilterFunction<Double>) value -> value > THRESHOLD);