Search code examples
apache-flinkflink-streamingflink-cep

How can i detect pattern a+b+ with Flink CEP


Flink CEP not working for my pattern. I have a sequence for example aabbbbaaaabbabb(a+b+). I want the function process show output like this: {aabbbb} {aaaabb} {abb}

AfterMatchSkipStrategy skipStrategy = AfterMatchSkipStrategy.skipPastLastEvent();
Pattern<JsonNode, JsonNode> attemptPattern = Pattern.<JsonNode>begin("first", skipStrategy)
        .where(new SPCondition() {
            @Override
            public boolean filter(JsonNode element, Context<JsonNode> context) throws Exception {
                return element.get("endpoint").textvalue().equals("A");
            }
        }).oneOrMore()
        .next("second")
        .where(new SPCondition() {
            @Override
            public boolean filter(JsonNode element, Context<JsonNode> context) throws Exception {
                return element.get("endpoint").textvalue().equals("B");
            }
        }).oneOrMore();

my result:

{aab} {aaaab} {ab}


Solution

  • You need to somehow insist that it take all of the B's it can, and not just match after the first one. Here's one way to do that.

    public class CEPExample {
    
        public static void main(String[] args) throws Exception {
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            DataStream<String> events = env.fromElements("a", "a", "b", "b", "b", "b", "a", "a", "a", "a", "b", "b", "a", "b", "b", "x");
    
            AfterMatchSkipStrategy skipStrategy = AfterMatchSkipStrategy.skipToFirst("end");
            Pattern<String, String> pattern = Pattern.<String>begin("first", skipStrategy)
                    .where(new SimpleCondition<String>() {
                        @Override
                        public boolean filter(String element) throws Exception {
                            return (element.equals("a"));
                        }
                    }).oneOrMore()
                    .next("second")
                    .where(new SimpleCondition<String>() {
                        @Override
                        public boolean filter(String element) throws Exception {
                            return (element.equals("b"));
                        }
                    }).oneOrMore()
                    .next("end")
                    .where(new SimpleCondition<String>() {
                        @Override
                        public boolean filter(String element) throws Exception {
                            return (!element.equals("b"));
                        }
                    });
    
            PatternStream<String> patternStream = CEP.pattern(events, pattern);
            patternStream.select(new SelectSegment()).print();
            env.execute();
        }
    
        public static class SelectSegment implements PatternSelectFunction<String, String> {
            public String select(Map<String, List<String>> pattern) {
                return String.join("", pattern.get("first")) + String.join("", pattern.get("second"));
            }
        }
    
    }
    

    If instead you want to match a+b*, while I feel like there should be a simpler solution, here's something that works:

    public class CEPExample {
    
        public static void main(String[] args) throws Exception {
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            DataStream<String> events = env.fromElements("a", "a", "b", "b", "b", "b", "a", "a", "a", "a", "x");
    
            AfterMatchSkipStrategy skipStrategy = AfterMatchSkipStrategy.skipToFirst("end");
            Pattern<String, String> pattern = Pattern.<String>begin("a-or-b", skipStrategy)
                    .where(new SimpleCondition<String>() {
                        @Override
                        public boolean filter(String element) throws Exception {
                            return element.equals("a") || element.equals("b");
                        }
                    }).oneOrMore()
                    .next("end")
                    .where(new IterativeCondition<String>() {
                        @Override
                        public boolean filter(String element, Context<String> ctx) throws Exception {
                            List<String> list = new ArrayList<>();
                            ctx.getEventsForPattern("a-or-b").iterator().forEachRemaining(list::add);
                            int length = list.size();
                            if (!element.equals("a") && !element.equals("b")) return true;
                            return (((length >= 1) && element.equals("a") && list.get(length - 1).equals("b")));
                        }
                    });
    
            PatternStream<String> patternStream = CEP.pattern(events, pattern);
            patternStream.select(new SelectSegment()).print();
            env.execute();
        }
    
        public static class SelectSegment implements PatternSelectFunction<String, String> {
            public String select(Map<String, List<String>> pattern) {
                return String.join("", pattern.get("a-or-b"));
            }
        }
    
    }
    

    For what it's worth, I generally find match_recognize offers a more straightforward DSL for pattern matching with Flink.