Search code examples
scalaflink-cep

FlinkCEP: Can I reference an earlier event to define a subsequent match?


Here is a simple example:

val pattern = 
   Pattern.begin[Event]("start").where(_.getId == 42).
   next("middle").subtype(classOf[SubEvent]).where(x => x.getVolume == **first event matched**.getVolume) ...

Essentially the second event ("middle") need to access the state of the first event ("start"). Is it possible to do this within FlinkCEP without requiring an external state?


Solution

  • Sure. You can get events by for a specific pattern with the help of Context.

    new IterativeCondition<Event>() {
                private static final long serialVersionUID = 8061969839441121955L;
    
                @Override
                public boolean filter(Event value, IterativeCondition.Context<Event> ctx) throws Exception {
                    double sum = 0.0;
                    for (Event e : ctx.getEventsForPattern("middle")) {
                        sum += e.getPrice();
                    }
                    return sum > 5.0;
                }
            }