Search code examples
javahadoopmapreduce

Mapreduce: Average calculation is not working


I am trying out this map reduce code to calculate the average values but for some reason, the average is not getting calculated properly. The idea is to calculate the average movie rating for every year

Mapper code

public class AverageRatingMapper extends Mapper<LongWritable, Text, Text, DoubleWritable>
{
    //initialize the writable datatype variables
    private final static DoubleWritable tempWritable = new DoubleWritable(0);
    private Text ReleaseYear = new Text();
    
    //Override the original map methods
    @Override
    //map takes in three parameters
    public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException
    {
        //creating an array called line of type String
        //Split the values in the fields and store each value in one array element
        String[] line = value.toString().split("\t");
        
        //create a variable ID with type String and store the the 4th element (index 3) from array line. This contains the Year in th data file
        String Year = line[3];
        
        //Set the value of the year object created from the Text class to be the value of the Year read from the data file
        ReleaseYear.set(Year);
        
        //Create a variable, temp, of type double and convert the value stored in the 15th element (14th index) of the line array from String to Double and store it in temp
        double temp = Double.parseDouble(line[14].trim());
        
        //Store temp in tempWritable
        tempWritable.set(temp);
        
        //Emit Year and the average rating in tempWritable to the Reducer class
        context.write(ReleaseYear, tempWritable);
        
        
    }

}

Reducer code:

public class AverageRatingReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable>
{
    //Create an arraylist of type double called ratingList
    ArrayList<Double> ratingList = new ArrayList<Double>();
    
    //Override reduce method
    @Override
    //reduce takes in thre parameters
    public void reduce(Text key, Iterable<DoubleWritable> values, Context context)
            throws IOException, InterruptedException
    {
        //create a variable SumofRatings of type double and initialize it to 0
        double SumofRating = 0.0;
        
        //use a for loop to store ratings in the ratingList array and sum the ratings in the SumofRatings variable
        for(DoubleWritable value : values)
        {
            ratingList.add(value.get());
            
            //calculate the cumulative sum
            SumofRating = SumofRating + value.get();
            
        }
        
        //get the number of rating in the arrayList
        int size = ratingList.size();
        
        //calculate the average rating
        double averageRating = SumofRating/size;
        
        //Emit the year and the average rating to the output file
        context.write(key, new DoubleWritable(averageRating));
    }
 
}

And the main class:

public class AverageRating 
{
    
    public static void main(String[] args) throws Exception
    {
        //Create an object, conf, from the configuration class
        Configuration conf = new Configuration();
        if (args.length != 3)
        {
            System.err.println("Usage: MeanTemperature <input path> <output path>");
            System.exit(-1);
        }
        
        //create an object, job,  from the Job class
        Job job;
        
        //configure the parameters for the job
        job = Job.getInstance(conf, "Average Rating");
        
        //specify the driver class in the JAR file
        job.setJarByClass(AverageRating.class);
        
        
        //setting the input and output paths for the job
        FileInputFormat.addInputPath(job, new Path(args[1]));
        FileOutputFormat.setOutputPath(job, new Path(args[2]));
        
        //Set the mapper and reducer for the job
        job.setMapperClass(AverageRatingMapper.class);
        job.setReducerClass(AverageRatingReducer.class);
        
        //Set the key class (Text) and value class (DoubleWritable) for te job output data
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(DoubleWritable.class);
        
        //Delete output if it exists
        FileSystem hdfs = FileSystem.get(conf);
        Path outputDir = new Path(args[2]);
        if(hdfs.exists(outputDir))
        {
            hdfs.delete(outputDir, true);
        }
        
        System.exit(job.waitForCompletion(true) ? 0 : 1);
        
    }

}

Sample of Data as requested:

imdbtitle title year avg_rating
tt0000009 XYZ 1911 6.9
tt0001892 PQR 1912 6.2
tt0002154 ABC 1912 8.2
tt0000458 JKL 1913 6.3
tt0015263 TGH 1913 7.1
tt0000053 PLO 1912 4.9

NOTE: there are many more columns. I have just added the important ones

The result is displayed right for the first year but the second and third years are completely inaccurate.

  • 1911 6.9
  • 1912 5.25
  • 1913 2.2

Could someone please help me out!


Solution

  • You don't really need an ArrayList in the Reduce function, because each reducer gets all the values grouped by the given key (so in this case, each reducer has all the ratings for a single year).

    Additionally, declaring the ArrayList outside of the Reduce function body kind of begs for trouble, since you are using this list just to count the number of the values that are on the reducer already. My guess is that between different reducer scans, the list keeps on getting populated with ratings from the ratings of the next key (aka year, here). So the first key-value pair is correct, but everything after it is not because the number of elements in the list keeps increasing.

    You can keep it more traditional, by having a simple int variable named numOfRatings that simply counts the number of values (aka ratings given for a specific year here), and use that variable to divide in order to find the average rating for each year.

    public static class AverageRatingReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable>
    {
        public void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException 
        {
            // create a variable SumofRatings of type double and initialize it to 0
            double sumOfRating = 0.0;
            int numOfRatings = 0;
    
            // use a for loop to store ratings in the ratingList array and sum the ratings in the SumofRatings variable
            for(DoubleWritable value : values)
            {
                sumOfRating += value.get();
                numOfRatings++;
            }
                    
            // calculate the average rating
            double averageRating = sumOfRating/numOfRatings;
            
            // emit the year and the average rating to the output file
            context.write(key, new DoubleWritable(averageRating));
        }
    }