Search code examples
javahadoopmultikey

Java Hadoop MapReduce Chaining Job


I have some code that correctly selects the source and the highest weight. I can't seem to pull the target column in as well. could someone point me in the right direction? I've never used java before. I think the reducer function needs to return a tuple. therefore does the variable targets in the mapper function need to have this tuple?

Desired output: each line contains a node ID, followed by a tab (\t), and the expected “tgt ,weight” tuple. The tuple is the tgt with the highest weight. In the event of a tie, return the tgt with the lowest number.

INPUT

src        tgt        weight

1        110        3

1        200        1

20        150        30

10        110        10

11        130        15

11        200        67

1        70        3

EXPECTED OUTPUT

1        70,3

20        150,30

10        110,10

11        200,67

CURRENT OUTPUT (need to add in the tgt column as tuple)

1        3

20        30

10        10

11        67
import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.util.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;


public class Q1 {


  public static class TargetMapper extends Mapper<Object, Text, Text, IntWritable> {

      private Text target = new Text();
      public void map(Object key, Text value, Context context
                ) throws IOException, InterruptedException {
            StringTokenizer st = new StringTokenizer(value.toString(), "\r");
            while (st.hasMoreTokens()) {
                String[] edge = st.nextToken().split("\t");
                target.set(edge[0]);
                context.write(target, new IntWritable(Integer.parseInt(edge[2])));
            }
        }

    }

  public static class EmailsReducer extends Reducer<Text,IntWritable,Text,IntWritable> {

      private IntWritable totalCount = new IntWritable();  
      public void reduce(Text key, Iterable<IntWritable> targets, Context context) throws IOException, InterruptedException{

            int max = 0;

            for (IntWritable target : targets)  {
                if(target.get() > max || max ==0) {
                    max = target.get();
                }
            }

            totalCount.set(max);

            context.write(key, totalCount);


        }
    }




    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "Q1");

        job.setJarByClass(Q1.class);
        job.setMapperClass(TargetMapper.class);
        job.setCombinerClass(EmailsReducer.class);
        job.setReducerClass(EmailsReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

Solution

  • You are interested in custom output. To achieve that, try implementing custom WritableComparable. You may have to update your logic to make it work according to your need.

    Something like:

    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.WritableComparable;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    import java.util.Objects;
    
    public class MyWritable implements WritableComparable<MyWritable> {
        private IntWritable tgt;
        private IntWritable weight;
    
        public MyWritable() {
            set(new IntWritable(), new IntWritable());
        }
    
        public MyWritable(int tgt, int weight) {
            set(new IntWritable(tgt), new IntWritable(weight));
        }
    
        public MyWritable(IntWritable tgt, IntWritable weight) {
            set(tgt, weight);
        }
    
        public IntWritable getTgt() {
            return tgt;
        }
    
        public IntWritable getWeight() {
            return weight;
        }
    
        public void set(IntWritable tgt, IntWritable weight) {
            this.tgt = tgt;
            this.weight = weight;
        }
    
        @Override
        public int compareTo(MyWritable o) {
            int cmp = tgt.compareTo(o.tgt);
            if (cmp == 0) {
                return weight.compareTo(o.weight);
            }
            return cmp;
        }
    
        @Override
        public void write(DataOutput dataOutput) throws IOException {
            tgt.write(dataOutput);
            weight.write(dataOutput);
        }
    
        @Override
        public void readFields(DataInput dataInput) throws IOException {
            tgt.readFields(dataInput);
            weight.readFields(dataInput);
        }
    
        @Override
        public boolean equals(Object o) {
            if (this == o) return true;
            if (o == null || getClass() != o.getClass()) return false;
            MyWritable that = (MyWritable) o;
            return Objects.equals(tgt, that.tgt) &&
                    Objects.equals(weight, that.weight);
        }
    
        @Override
        public int hashCode() {
            return Objects.hash(tgt, weight);
        }
    }
    

    And update your code to use this as value in Mapper & Reducer. Like:

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    import java.util.StringTokenizer;
    
    
    public class Q1 {
    
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf, "Q1");
    
            job.setJarByClass(Q1.class);
            job.setMapperClass(TargetMapper.class);
            job.setCombinerClass(EmailsReducer.class);
            job.setReducerClass(EmailsReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(MyWritable.class);
            job.setMapOutputValueClass(MyWritable.class);
    
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
    
        public static class TargetMapper extends Mapper<Object, Text, Text, MyWritable> {
            public void map(Object key, Text value, Context context
            ) throws IOException, InterruptedException {
                StringTokenizer st = new StringTokenizer(value.toString(), "\r");
                while (st.hasMoreTokens()) {
                    String[] edge = st.nextToken().split("\t");
                    Text target = new Text();
                    target.set(edge[0]);
                    int tgt = Integer.parseInt(edge[1]);
                    int weight = Integer.parseInt(edge[2]);
                    context.write(target, new MyWritable(tgt, weight));
                }
            }
    
        }
    
        public static class EmailsReducer extends Reducer<Text, MyWritable, Text, MyWritable> {
            private MyWritable res = new MyWritable();
    
            public void reduce(Text key, Iterable<MyWritable> targets, Context context) throws IOException, InterruptedException {
                int maxWeight = Integer.MIN_VALUE;
                int maxTgt = Integer.MIN_VALUE;
    
                for (MyWritable target : targets) {
                    if (target.getWeight().get() > maxWeight) {
                        maxWeight = target.getWeight().get();
                        maxTgt = target.getTgt().get();
                    }
                }
    
                res.set(new IntWritable(maxTgt), new IntWritable(maxWeight));
    
                context.write(key, res);
            }
        }
    }