Search code examples
javaapache-flinkflink-streaming

Apache Flink Filter Function


i want to implement a custom filter function in Apache Flink but i don't get how i can inject a list of filter conditions into it without hard wiring them.

Let's assume my function looks like this

public class CustomFilter implements FilterFunction{

  @Override
  public boolean filter(Object o) throws Exception{
  String[] values = {"First","Second","Last"}; <-- How can i pass this Array or Collection to my Filter function?
     for(String s: values){
        if(!o.toString().contains(s)) return false;
     }
  return true;
  }
}

The streaming job will look like this:

public class StreamingJob{
...
env 
    .fromElements("Data","New Data","First")
    .filter(new CustomFilter())
    .print
    .execute();
}

When i try to add some kind of collection to the CustomFilter function parameters in the class like

public boolean filter(String s, Collection<String> searchValues){
    ...
}

i get the message that the function has to be from type String only which as it is an implemented function.


Solution

  • As others have noted, just save the list of target values that you pass in via the constructor, and use them in the filter() method.

    public class CustomFilter implements FilterFunction<Object> {
    
        private String[] targetValues;
    
        public CustomFilter(String[] targetValues) {
            this.targetValues = targetValues;
        }
    
    
    }