Search code examples
javahadoopmapreduceapache-sparkhadoop-yarn

How to reset Iterator on MapReduce Function in Apache Spark


I'm a newbie with Apache-Spark. I wanna know how to reset the pointer to Iterator in MapReduce function in Apache Spark so that I wrote

Iterator<Tuple2<String,Set<String>>> iter = arg0;    

but it isn't working. Following is a class implementing MapReduce function in java.

class CountCandidates implements Serializable,
    PairFlatMapFunction<Iterator<Tuple2<String,Set<String>>>, Set<String>, Integer>,
    Function2<Integer, Integer, Integer>{

    private List<Set<String>> currentCandidatesSet;
    public CountCandidates(final List<Set<String>> currentCandidatesSet) {
        this.currentCandidatesSet = currentCandidatesSet;
    }

    @Override
    public Iterable<Tuple2<Set<String>, Integer>> call(
            Iterator<Tuple2<String, Set<String>>> arg0)
            throws Exception {
        List<Tuple2<Set<String>,Integer>> resultList = 
                new LinkedList<Tuple2<Set<String>,Integer>>();

        for(Set<String> currCandidates : currentCandidatesSet){
            Iterator<Tuple2<String,Set<String>>> iter = arg0;
            while(iter.hasNext()){
                Set<String> events = iter.next()._2;
                if(events.containsAll(currCandidates)){
                    Tuple2<Set<String>, Integer> t = 
                            new Tuple2<Set<String>, Integer>(currCandidates,1);
                    resultList.add(t);
                }
            }
        }

        return resultList;
    }

    @Override
    public Integer call(Integer arg0, Integer arg1) throws Exception {
        return arg0+arg1;
    }
}

If iterator can not be reset in the function how can I iterate the parameter arg0 several times? I already tried some different ways as following code but it is also not working. The following code seems like 'resultList' has too many data than I expected.

        while(arg0.hasNext()){
            Set<String> events = arg0.next()._2;
            for(Set<String> currentCandidates : currentCandidatesSet){
                if(events.containsAll(currentCandidates)){
                    Tuple2<Set<String>, Integer> t = 
                            new Tuple2<Set<String>, Integer>(currentCandidates,1);
                    resultList.add(t);
                }
            }
        }

How can I solve it?

Thanks in advance for your answer and sorry for my poor english. If you don't understand my question please make a comment


Solution

  • The hadoop iterator could theoretically be reset to the beginning if it was cloneable. Reseting to the beginning in a Mapreduce framework would be acceptable since you would still get to read the file from the beginning getting better overall speed. Reseting the iterator to a random point would be counter to the Mapreduce mind set because it would likely require random access from a file.

    There is a ticket in Hadoop's Jira explaining why they chose not to make the iterator cloneable although it does indicate that it is possible that it would be since the values would not have to be stored in memory.