Search code examples
javaapache-sparkdistributed-computing

Spark mapPartitions iterator returns duplicate records


I am having an implementation class for FlatMapFunction>>, String>. Initialising some non-serialisable connections for each partition. But when I call next() on the iterator, it gives same record for multiple partitions. Following is the code:

@Override
    public Iterator < String > call(Iterator < Tuple2 < String, Iterable < String >>> tuple2Iterator)
    throws Exception {
        BitLambdaService lambda = buildClient();
        List <String> resultList = new ArrayList < > ();
        while (tuple2Iterator.hasNext()) {
            Tuple2 < String, Iterable < String >> tpl = tuple2Iterator.next();
            // do something
        }
        return resultList.iterator();
    }

Does anyone faced this problem earlier? Or know how to fix it?


Solution

  • Solved it by calling the rdd.cache() just after the transformation. The problem occured because the transformations are executed in lazy fashion, and when an action is applied on RDD, the transformations are actually executed when a action is applied on RDD. So the mapPartitions was not waiting for the call method to complete and assigned the same records to another executor.