i want to implement a custom filter function in Apache Flink but i don't get how i can inject a list of filter conditions into it without hard wiring them.
Let's assume my function looks like this
public class CustomFilter implements FilterFunction{
@Override
public boolean filter(Object o) throws Exception{
String[] values = {"First","Second","Last"}; <-- How can i pass this Array or Collection to my Filter function?
for(String s: values){
if(!o.toString().contains(s)) return false;
}
return true;
}
}
The streaming job will look like this:
public class StreamingJob{
...
env
.fromElements("Data","New Data","First")
.filter(new CustomFilter())
.print
.execute();
}
When i try to add some kind of collection to the CustomFilter function parameters in the class like
public boolean filter(String s, Collection<String> searchValues){
...
}
i get the message that the function has to be from type String only which as it is an implemented function.
As others have noted, just save the list of target values that you pass in via the constructor, and use them in the filter()
method.
public class CustomFilter implements FilterFunction<Object> {
private String[] targetValues;
public CustomFilter(String[] targetValues) {
this.targetValues = targetValues;
}
}