Search code examples
javaapache-flinkcomplex-event-processing

Apache Flink CEP pattern detection with java


I want to do;
to start with CEP with any of the arraylist elements included in the map structure and to continue with the rest of the arraylist elements I have started.
map and pattern structure:

final Map< Integer,ArrayList<String>> deger = new HashMap<Integer,ArrayList<String>>();
        deger.put(1,new ArrayList<String>(Arrays.asList("h:1","l:1","g:0")));
        deger.put(2,new ArrayList<String>(Arrays.asList("h:1","l:1","g:1")));
        deger.put(3,new ArrayList<String>(Arrays.asList("h:2","l:3","g:1")));
        deger.put(4,new ArrayList<String>(Arrays.asList("h:0","l:2","g:2")));

 for(int i=1;i<deger.size()+1;i++) {
            temp1.add(deger.get(i));
        }

Pattern<String,?> pattern = Pattern.<String>begin("start").where(
                new SimpleCondition<String>() {
//                    @Override
                    public boolean filter(String value) throws Exception {

                        for (ArrayList<String> aa: temp1){
                            for (String dd : aa)
                                if(value.equals(dd)){ 
                                    return true;
                                }
                        }
                        return false;
                    }
                }
        ).followedBy("middle").where(
                new SimpleCondition<String>() {
                    @Override
                    public boolean filter(String value) throws Exception {
                        return value.equals(temp1.get(1));
                    }
                }
        ).followedBy("end").where(
                new SimpleCondition<String>() {
                    @Override
                    public boolean filter(String value) throws Exception {
                        return value.equals(temp1.get(2));
                    }
                }
        );

my aim is to give warning with arraylist elements in the map but the order of the arraylist elements is not important because of the stream stream therein.I want to proceed with the remaining elements of this array where I can return the information of this array when I start with any array here. For example:

Incoming data = "l:1","h:1","g:0"
my pattern = "h:1","l:1","g:0" 
Start -> l:1 find
Middle -> g:0 or h:1 | h:1 find
End -> g:0 find -> alarm

Solution

  •  public static  Integer temp1;
        public static  Map<Integer,ArrayList<String>> temp2 = new HashMap<>();     
    final Map< Integer,ArrayList<String>> deger = new HashMap<>();
                deger.put(1,new ArrayList<>(Arrays.asList("h:1","g:1","s:0")));
                deger.put(2,new ArrayList<>(Arrays.asList("h:1","g:1","g:0")));
                deger.put(3,new ArrayList<>(Arrays.asList("h:1","c:0","g:0")));
                deger.put(4,new ArrayList<>(Arrays.asList("h:1","s:1","g:0")));
    
    
                Pattern<String,?> pattern = Pattern.<String>begin("start").where(
                        new SimpleCondition<String>() {
                            @Override
                            public boolean filter(String value) throws Exception {
                                flag = false;
                                for(Map.Entry<Integer, ArrayList<String>> entryStart : deger.entrySet()) {
                                    if(entryStart.getValue().contains(value) && !temp2.containsKey(entryStart.getKey())){
                                            ArrayList<String> newList = new ArrayList<String>();
                                            newList.addAll(entryStart.getValue());
                                            newList.remove(value);
                                            temp2.put(entryStart.getKey(),newList);
                                            flag = true;
                                    }
                                }
                                return flag;
                            }
                        }
                ).followedBy("middle").where(
                        new SimpleCondition<String>() {
                            @Override
                            public boolean filter(String middle) throws Exception {
                                flag = false;
                                for(Map.Entry<Integer, ArrayList<String>> entryMiddle : temp2.entrySet()) {
                                    if(entryMiddle.getValue().contains(middle) && entryMiddle.getValue().size() == 2){
                                        ArrayList<String> newListMiddle = new ArrayList<String>();
                                        newListMiddle.addAll(entryMiddle.getValue());
                                        newListMiddle.remove(middle);
                                        temp2.put(entryMiddle.getKey(),newListMiddle);
                                        flag = true;
                                    }
                                }
                                return flag;
                            }
                        }
                ).followedBy("end").where(
                        new SimpleCondition<String>() {
                            @Override
                            public boolean filter(String end) throws Exception {
                                flag = false;
                                for(Map.Entry<Integer, ArrayList<String>> entryEnd : temp2.entrySet()) {
                                    if(entryEnd.getValue().contains(end) && entryEnd.getValue().size() == 1){
                                        flag = true;
                                        temp1 = entryEnd.getKey();
                                    }
                                }
                                if (flag)
                                    temp2.remove(temp1);
                                return flag;
                            }
                        }
                );
    
                PatternStream<String> patternStream = CEP.pattern(stream_itemset_ham,pattern);
    
                DataStream<String> result = patternStream.select(
                        new PatternSelectFunction<String, String>() {
                            @Override
                            public String select(Map<String, List<String>> map) throws Exception {
                                ArrayList<String> NewList= new ArrayList<>();
                                NewList.addAll(deger.get(temp1));
                                String found = "Found";
                                for (String list_element : NewList)
                                    found += " " + list_element ;
                                return found;
                            }
                        }
                );
                result.print();
    

    I understand from your question that this kind of solution can be offered.