Search code examples
hadoophadoop-partitioning

Hadoop - Produce multiple values for a single key


I was able to successfully change the wordcount program in hadoop to suit my requirement. However, I have another situation where in I use the same key for 3 values. Let's say my input file is as below.

A Uppercase 1 firstnumber  I  romannumber a lowercase
B Uppercase 2 secondnumber II romannumber b lowercase

Currently in my map/reduce program, I am doing something like below. Here A is the key and 1 is the value.

A 1

I need my map reduce to perform something like below.

A 1 I a 

I can do them in 3 different programs like below and can produce the output.

A 1
A I
A a

However, I want them to do in a single program itself. Basically, from my map function I want to do this.

context.write(key,value1);
context.write(key,value2);
context.write(key,value3);

Is there any way I can do it in the same program rather than writing three different programs?

EDIT:

Let me provide a much more clearer example. I need to do something like below.

A uppercase 1 firstnumber  1.0 floatnumber str stringchecking
A uppercase 2 secondnumber 2.0 floatnumber ing stringchecking

My final output would be,

A 3 3.0 string

3 is the sum of two integers, 3.0 being sum of float numbers and string is the concatenation of two strings.


Solution

  • First you'll need a composite writable for all three of your values.

    public class CompositeWritable implements Writable {
        int val1 = 0;
        float val2 = 0;
        String val3 = "";
    
        public CompositeWritable() {}
    
        public CompositeWritable(int val1, float val2, String val3) {
            this.val1 = val1;
            this.val2 = val2;
            this.val3 = val3;
        }
    
        @Override
        public void readFields(DataInput in) throws IOException {
            val1 = in.readInt();
            val2 = in.readFloat();
            val3 = WritableUtils.readString(in);
        }
    
        @Override
        public void write(DataOutput out) throws IOException {
            out.writeInt(val1);
            out.writeFloat(val2);
            WritableUtils.writeString(out, val3);
        }
    
        public void merge(CompositeWritable other) {
            this.val1 += other.val1;
            this.val2 += other.val2;
            this.val3 += other.val3;
        }
    
        @Override
        public String toString() {
            return this.val1 + "\t" + this.val2 + "\t" + this.val3;
        }
    }
    

    Then in your reduce you'll do something like this...

    public void reduce(Text key, Iterable<CompositeWritable> values, Context ctx) throws IOException, InterruptedException{
    
        CompositeWritable out;
    
        for (CompositeWritable next : values)
        {
            out.merge(next);
        }
    
        ctx.write(key, out);
    }
    

    Your mapper will simply output one CompositeWritable per map.

    I haven't tried to compile this, but the general idea is there.