Search code examples
javaapache-sparkrddjava-pair-rdd

how to use filter using containsAll and contains in javapairrdd


I have 2 collections one is 'list' and another 'pairRdd2' which contains data as mentioned below.

I am trying to apply filter with containsAll where in if mypairRdd2 contains all the values mentioned in list. Expected result is joe,{US,UK}

List<String> list = Arrays.asList("US","UK");

JavaRDD pairRdd = ctx.parallelize(Arrays.asList(new Tuple2("john","US"),new Tuple2("john","UAE"),new Tuple2("joe","US"),new Tuple2("joe","UK")));

JavaPairRDD<String, String> pairRdd2 = JavaPairRDD.fromJavaRDD(pairRdd);

pairRdd2.groupByKey().filter(x-> Arrays.asList(x._2).containsAll(list)).foreach(new VoidFunction<Tuple2<String,Iterable<String>>>() {

    @Override
    public void call(Tuple2<String, Iterable<String>> t) throws Exception {
        System.out.println(t._1());             
    }
});

Can someone highlight what am i doing wrong...


Solution

  • The problem is with Arrays.asList(). This creates a list of Iterables, which is not what you need to perform the filter. You should use the list given by groupBy itself:

        pairRdd2.groupByKey().filter(f -> {
            Set<String> set = new HashSet<>();
            for(String s: f._2())
                set.add(s);
    
            return list.containsAll(set);
        });
    

    You may also find a quick way to convert an iterable/iterator to a collection and avoid the loop altogether.