I have 2 collections one is 'list' and another 'pairRdd2' which contains data as mentioned below.
I am trying to apply filter with containsAll where in if mypairRdd2 contains all the values mentioned in list. Expected result is joe,{US,UK}
List<String> list = Arrays.asList("US","UK");
JavaRDD pairRdd = ctx.parallelize(Arrays.asList(new Tuple2("john","US"),new Tuple2("john","UAE"),new Tuple2("joe","US"),new Tuple2("joe","UK")));
JavaPairRDD<String, String> pairRdd2 = JavaPairRDD.fromJavaRDD(pairRdd);
pairRdd2.groupByKey().filter(x-> Arrays.asList(x._2).containsAll(list)).foreach(new VoidFunction<Tuple2<String,Iterable<String>>>() {
@Override
public void call(Tuple2<String, Iterable<String>> t) throws Exception {
System.out.println(t._1());
}
});
Can someone highlight what am i doing wrong...
The problem is with Arrays.asList()
. This creates a list of Iterable
s, which is not what you need to perform the filter. You should use the list given by groupBy
itself:
pairRdd2.groupByKey().filter(f -> {
Set<String> set = new HashSet<>();
for(String s: f._2())
set.add(s);
return list.containsAll(set);
});
You may also find a quick way to convert an iterable/iterator to a collection and avoid the loop altogether.