Search code examples
javahadoopmapreducehadoop2hadoop-partitioning

hadoop mapreduce unordered tuple as map key


Based on the wordcount example from Hadoop - The Definitive Guide, I've developed a mapreduce job to count the occurence of unordered tuples of Strings. The input looks like this (just larger):

a b
c c
d d
b a
a d
d d

Running mapreduce I expect the output to be (for this example):

c c 1
d d 1
a b 2
a d 1
d d 1

Which means, I want the tuples a,b and b,a to be considered the same. The question has already been asked here: Hadoop MapReduce: Two values as key in Mapper-Reducer and probably been solved here https://developer.yahoo.com/hadoop/tutorial/module5.html#keytypes.

For large input files I get output like this, first column is the hashCode of the resp. key:

151757761 a a 62822
153322274 a b 62516
154886787 a c 62248
156451300 a d 62495
153322274 b a 62334
154902916 b b 62232
158064200 b d 62759
154886787 c a 62200
156483558 c b 124966
158080329 c c 62347
159677100 d c 125047
156451300 d a 62653
158064200 d b 62603
161290000 d d 62778

As can be seen, some keys are duplicates, like 153322274 for a, b and b, a. For others, like c, b (and b,c) and c, d (and d,c) the count is correct. Roughly double the amount than the others because the test data is drawn uniformely at random.

I've been searching for the issue for some time and have now run out of ideas, why there could still be key duplicates after the reduce phase.

Below is the code I use:

First the code for my custom WritableComparable

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableUtils;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.math.BigInteger;

public class Pair implements WritableComparable<Pair> {

    private String first;
    private String second;

    public Pair(String first, String second) {
        this.first = first;
        this.second = second;
    }

    public Pair() {
       this("", "");
    }

    @Override
    public String toString() {
        return this.hashCode() + "\t" + first + "\t" + second;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        WritableUtils.writeString(out, first);
        WritableUtils.writeString(out, second);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        first = WritableUtils.readString(in);
        second = WritableUtils.readString(in);
    }

    @Override
    public int hashCode() {
        BigInteger bA = BigInteger.ZERO;
        BigInteger bB = BigInteger.ZERO;

        for(int i = 0; i < first.length(); i++) {
            bA = bA.add(BigInteger.valueOf(127L).pow(i+1).multiply(BigInteger.valueOf(first.codePointAt(i))));
        }

        for(int i = 0; i < second.length(); i++) {
            bB = bB.add(BigInteger.valueOf(127L).pow(i+1).multiply(BigInteger.valueOf(second.codePointAt(i))));
        }

        return bA.multiply(bB).intValue();
    }

    @Override
    public boolean equals(Object o) {
        if (o instanceof Pair) {
            Pair other = (Pair) o;

            boolean result = ( first.compareTo(other.first) == 0 && second.compareTo(other.second) == 0 )
                    || ( first.compareTo(other.second) == 0 && second.compareTo(other.first) == 0 );

            return result;
        }
        return false;
    }

    @Override
    public int compareTo(Pair other) {
        if (( first.compareTo(other.first) == 0 && second.compareTo(other.second) == 0 )
                || ( first.compareTo(other.second) == 0 && second.compareTo(other.first) == 0 ) ) {
            return 0;
        } else {
            int cmp = first.compareTo( other.first );

            if (cmp != 0) {
                return cmp;
            }

            return second.compareTo( other.second );
        }
    }
}

And the rest:

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class PairCount {

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

        if (otherArgs.length < 2) {
            System.err.println("Usage: paircount <in-dir> <out-dir>");
            System.exit(2);
        }

        Job job = new Job(conf, "word count");
        job.setJarByClass(PairCount.class);

        job.setMapperClass(TokenizerMapper.class);
        job.setReducerClass(IntSumReducer.class);

        job.setMapOutputKeyClass(Pair.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setOutputKeyClass(Pair.class);
        job.setOutputValueClass(IntWritable.class);

        for (int i = 0; i < otherArgs.length - 1; ++i) {
            FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
        }

        FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

    public static class TokenizerMapper extends Mapper<Object, Text, Pair, IntWritable> {

        private final static IntWritable one = new IntWritable(1);

        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());

            while (itr.hasMoreTokens()) {
                context.write(new Pair(itr.nextToken(), itr.nextToken()), one);
            }
        }
    }

    public static class IntSumReducer extends Reducer<Pair, IntWritable, Pair, IntWritable> {
        private IntWritable result = new IntWritable();

        public void reduce(Pair key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int sum = 0;

            for (IntWritable val : values) {
                sum += val.get();
            }

            result.set(sum);
            context.write( key, result);
        }
    }
}

Edit: I added unit tests for the hashCode() and compareTo() functions. They work just fine.

import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;

public class Tests  {
    @Test
    public void testPairComparison() {
        assertTrue( 0 == new Pair("a", "a").compareTo(new Pair("a", "a")) );
        assertTrue( 0 == new Pair("a", "b").compareTo(new Pair("b", "a")) );
        assertTrue( 0 == new Pair("a", "c").compareTo(new Pair("c", "a")) );
        assertTrue( 0 == new Pair("a", "d").compareTo(new Pair("d", "a")) );

        assertTrue( 0 == new Pair("b", "b").compareTo(new Pair("b", "b")) );
        assertTrue( 0 == new Pair("b", "c").compareTo(new Pair("c", "b")) );
        assertTrue( 0 == new Pair("b", "d").compareTo(new Pair("d", "b")) );

        assertTrue( 0 == new Pair("c", "c").compareTo(new Pair("c", "c")) );
        assertTrue( 0 == new Pair("c", "d").compareTo(new Pair("d", "c")) );

        assertTrue( 0 == new Pair("d", "d").compareTo(new Pair("d", "d")) );

        assertTrue( 0 > new Pair("a", "a").compareTo(new Pair("b", "b")) );
        assertTrue( 0 > new Pair("a", "a").compareTo(new Pair("c", "b")) );
        assertTrue( 0 < new Pair("d", "d").compareTo(new Pair("c", "b")) );
        assertTrue( 0 < new Pair("c", "d").compareTo(new Pair("c", "a")) );
    }

    @Test
    public void testPairHashcode(){
        assertTrue( 0 != new Pair("a", "a").hashCode());
        assertTrue( 0 != new Pair("a", "b").hashCode());
        assertTrue( 0 != new Pair("a", "c").hashCode());
        assertTrue( 0 != new Pair("a", "d").hashCode());

        assertTrue( 0 != new Pair("b", "b").hashCode());
        assertTrue( 0 != new Pair("b", "c").hashCode());
        assertTrue( 0 != new Pair("b", "d").hashCode());

        assertTrue( 0 != new Pair("c", "c").hashCode());
        assertTrue( 0 != new Pair("c", "d").hashCode());

        assertTrue( 0 != new Pair("d", "d").hashCode());

        assertEquals( new Pair("a", "a").hashCode(), new Pair("a", "a").hashCode() );
        assertEquals( new Pair("a", "b").hashCode(), new Pair("b", "a").hashCode() );
        assertEquals( new Pair("a", "c").hashCode(), new Pair("c", "a").hashCode() );
        assertEquals( new Pair("a", "d").hashCode(), new Pair("d", "a").hashCode() );

        assertEquals( new Pair("b", "b").hashCode(), new Pair("b", "b").hashCode() );
        assertEquals( new Pair("b", "c").hashCode(), new Pair("c", "b").hashCode() );
        assertEquals( new Pair("b", "d").hashCode(), new Pair("d", "b").hashCode() );

        assertEquals( new Pair("c", "c").hashCode(), new Pair("c", "c").hashCode() );
        assertEquals( new Pair("c", "d").hashCode(), new Pair("d", "c").hashCode() );

        assertEquals( new Pair("d", "d").hashCode(), new Pair("d", "d").hashCode() );

        assertNotEquals( new Pair("a", "a").hashCode(), new Pair("b", "b").hashCode() );
        assertNotEquals( new Pair("a", "b").hashCode(), new Pair("b", "d").hashCode() );
        assertNotEquals( new Pair("a", "c").hashCode(), new Pair("d", "a").hashCode() );
        assertNotEquals( new Pair("a", "d").hashCode(), new Pair("a", "a").hashCode() );
    }
}

But I realized that, changing compareTo() to always return 0 will result in every pair beeing considered the same resulting in the output:

156483558 c b 1000000

whilst changing hashCode() to always return 0 (for the same input data as above) will result in the same result as above just with the keys being zero.

0 a a 62822
0 a b 62516
0 a c 62248
0 a d 62495
0 b a 62334
0 b b 62232
0 b d 62759
0 c a 62200
0 c b 124966
0 c c 62347
0 d c 125047
0 d a 62653
0 d b 62603
0 d d 62778

Edit:

I investigated further, making compareTo() print what is beeing compared. This showed, that some keys like a,b and b,a are never compared to each other, thus not beeing grouped.

If not all keys are compared to each other, how is grouping possible anyway (besides using hashCode() what it doesn't)?

I guess there is some tiny thing I am missing. I'm glad for any ideas! Thank you very much in advance.

best regards


Solution

  • The problem is in the compareTo() function. First check if they are equal in terms of a,b is equal to b,a. If that is not the case, first compare the smaller values of the pairs and if they match, compare the bigger values of the resp. pairs. This solves the issue.

    This is how I implemented it now:

    @Override
    public int compareTo(Pair other){
        int cmpFirstFirst = first.compareTo(other.first);
        int cmpSecondSecond =  second.compareTo(other.second);
        int cmpFirstSecond = first.compareTo(other.second);
        int cmpSecondFirst =  second.compareTo(other.first);
    
        if ( cmpFirstFirst == 0 && cmpSecondSecond == 0 || cmpFirstSecond == 0 && cmpSecondFirst == 0) {
            return 0;
        }
    
        String thisSmaller;
        String otherSmaller;
    
        String thisBigger;
        String otherBigger;
    
        if ( this.first.compareTo(this.second) < 0 ) {
            thisSmaller = this.first;
            thisBigger = this.second;
        } else {
            thisSmaller = this.second;
            thisBigger = this.first;
        }
    
        if ( other.first.compareTo(other.second) < 0 ) {
            otherSmaller = other.first;
            otherBigger = other.second;
        } else {
            otherSmaller = other.second;
            otherBigger = other.first;
        }
    
        int cmpThisSmallerOtherSmaller = thisSmaller.compareTo(otherSmaller);
        int cmpThisBiggerOtherBigger = thisBigger.compareTo(otherBigger);
    
        if (cmpThisSmallerOtherSmaller == 0) {
            return cmpThisBiggerOtherBigger;
        } else {
            return cmpThisSmallerOtherSmaller;
        }
    }
    

    This means that, in contrast to my assumption, the grouping of the map output is done using a transitive relation rather than a cross product of the keys. A stable order of the keys is necessary. This makes total sense once you know and understood it.