Search code examples
javamapreducehadoop2movie

Understanding Mapreduce Code


I am trying to practice Big Data Mapreduce by making Movie recommendation System . My code:

*imports



public class MRS {
    public static class Map extends Mapper<LongWritable, Text, Text, Text> {
        public void map(LongWritable key, Text value, Context con)
                throws IOException, InterruptedException {
            String line = value.toString();

            StringTokenizer token = new StringTokenizer(line);

        while(token.hasMoreTokens()){
            String userId = token.nextToken();
            String movieId = token.nextToken();
            String ratings =token.nextToken();
            token.nextToken();
            con.write(new Text(userId), new Text(movieId + "," + ratings));
        }

    }
}

public static class Reduce extends
        Reducer<Text, IntWritable, Text, Text> {
    public void reduce(Text key, Iterable<Text> value,Context con ) throws IOException, InterruptedException{
        int item_count=0;
        int item_sum =0;
        String result="[";
        for(Text t : value){
            String s = t.toString();
            StringTokenizer token = new StringTokenizer(s,",");
            while(token.hasMoreTokens()){
            token.nextToken();
            item_sum=item_sum+Integer.parseInt(token.nextToken());
            item_count++;
            }
            result=result+"("+s+"),";


        }
        result=result.substring(0, result.length()-1);
        result=result+"]";
        result=String.valueOf(item_count)+","+String.valueOf(item_sum)+","+result;

        con.write(key, new Text(result));
    }
}

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    Configuration con = new Configuration();
    Job job = new Job(con,"Movie Recommendation");

    job.setJarByClass(MRS.class);


    job.setMapperClass(Map.class);
    job.setCombinerClass(Reduce.class);
    job.setReducerClass(Reduce.class);


    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);


    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);


    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));


    System.exit(job.waitForCompletion(true) ? 0 : 1);

}

}

I am using the movielens dataset from here

Of which input file is u.data

and my output after running this code should be like

userId Item_count,Item_sum,[list of movie_Id with rating]

However, I am getting this

99  173,4
99  288,4
99  66,3
99  203,4
99  105,2
99  12,5
99  1,4
99  741,3
99  895,3
99  619,4
99  742,5
99  294,4
99  196,4
99  328,4
99  120,2
99  246,3
99  232,4
99  181,5
99  201,3
99  978,3
99  123,3
99  433,4
99  345,3

This should be the output of the Map class


Solution

  • I made few adjustment to the code and it is giving me the exact expected result . Here is my new code

    imports*

    public class MRS {
    public static class Map extends
            Mapper<LongWritable, Text, IntWritable, Text> {
        public void map(LongWritable key, Text value, Context con)
                throws IOException, InterruptedException {
            String line = value.toString();
            String[] s = line.split("\t");
            StringTokenizer token = new StringTokenizer(line);
    
            while (token.hasMoreTokens()) {
                IntWritable userId = new IntWritable(Integer.parseInt(token
                        .nextToken()));
                String movieId = token.nextToken();
                String ratings = token.nextToken();
                token.nextToken();
                con.write(userId, new Text(movieId + "," + ratings));
            }
    
        }
    }
    
    public static class Reduce extends
            Reducer<IntWritable, Text, IntWritable, Text> {
        public void reduce(IntWritable key, Iterable<Text> value, Context con)
                throws IOException, InterruptedException {
            int item_count = 0;
            int item_sum = 0;
            String result = "";
            for (Text t : value) {
                String s = t.toString();
                StringTokenizer token = new StringTokenizer(s, ",");
    
                result = result + "[" + s + "],";
    
            }
            result = result.substring(1, result.length() - 2);
    
            System.out.println(result);
            con.write(key, new Text(result));
        }
    }
    
    public static void main(String[] args) throws IOException,
            ClassNotFoundException, InterruptedException {
        Configuration con = new Configuration();
        Job job = new Job(con, "Movie Recommendation");
    
        job.setJarByClass(MRS.class);
    
        job.setMapperClass(Map.class);
        job.setCombinerClass(Reduce.class);
        job.setReducerClass(Reduce.class);
    
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(Text.class);
    
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
    
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    
    }
    
    }
    

    what I change is Driver code

    job.setOutputKeyClass(IntWritable.class);
    

    Mapper code

     Mapper<LongWritable, Text, IntWritable, Text>
    

    Reducer code

     public static class Reduce extends
        Reducer<Text, IntWritable, Text, Text> {
        public void reduce(Text key, Iterable<Text> value,Context con ) throws 
     IOException, InterruptedException{
    

    I think the problem was that the outputkey and outputvalue data is matching the mapper class thats why it is printing mapper and not even executng reducer

    Correct me if I am wrong.