Search code examples
apache-flinkflink-streamingflink-cep

Flink Complex Event Processing


I have a flink cep code that reads from socket and detects for a pattern. Lets say the pattern(word) is 'alert'. If the word alert occurs five times or more, an alert should be created. But I am getting an input mismatch error. Flink version is 1.3.0. Thanks in advance !!

package pattern;

import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.List;
import java.util.Map;

    public class cep {

         public static void main(String[] args) throws Exception {


             StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

                DataStreamSource<String> dss = env.socketTextStream("localhost", 3005);

                dss.print();

                Pattern<String,String> pattern = Pattern.<String> begin("first")
                        .where(new IterativeCondition<String>() {
                            @Override
                            public boolean filter(String word, Context<String> context) throws Exception {
                                return word.equals("alert");
                            }
                        })
                        .times(5);


                PatternStream<String> patternstream = CEP.pattern(dss, pattern);

                DataStream<String> alerts = patternstream
                        .flatSelect((Map<String,List<String>> in, Collector<String> out) -> {

                            String first = in.get("first").get(0);

                            for (int i = 0; i < 6; i++ ) {

                                out.collect(first);

                            }


                        });

                alerts.print();

                env.execute();

            }

    }

enter image description here


Solution

  • So I have got the code to work. Here is the working solution,

        package pattern;
    
        import org.apache.flink.cep.CEP;
        import org.apache.flink.cep.PatternSelectFunction;
        import org.apache.flink.cep.PatternStream;
        import org.apache.flink.cep.pattern.Pattern;
        import org.apache.flink.cep.pattern.conditions.IterativeCondition;
        import org.apache.flink.streaming.api.datastream.DataStream;
        import org.apache.flink.streaming.api.datastream.DataStreamSource;
        import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
        import org.apache.flink.util.Collector;
    
        import java.util.List;
        import java.util.Map;
    
        public class cep {
    
             public static void main(String[] args) throws Exception {
    
    
                 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
                    DataStreamSource<String> dss = env.socketTextStream("localhost", 3005);
    
                    dss.print();
    
                    Pattern<String,String> pattern = Pattern.<String> begin("first")
                            .where(new IterativeCondition<String>() {
                                @Override
                                public boolean filter(String word, Context<String> context) throws Exception {
                                    return word.equals("alert");
                                }
                            })
                            .times(5);
    
                    PatternStream<String> patternstream = CEP.pattern(dss, pattern);
    
                    DataStream<String> alerts = patternstream
                            .select(new PatternSelectFunction<String, String>() {
                                @Override
                                public String select(Map<String, List<String>> in) throws Exception {
    
                                    String first = in.get("first").get(0);
    
                                    if(first.equals("alert")){
    
                                        return ("5 or more alerts");
                                    }
                                    else{
    
                                        return (" ");
                                    }
                                }
                            });
    
                    alerts.print();
    
                    env.execute();
    
                }
    
        }