I am trying out this map reduce code to calculate the average values but for some reason, the average is not getting calculated properly. The idea is to calculate the average movie rating for every year
Mapper code
public class AverageRatingMapper extends Mapper<LongWritable, Text, Text, DoubleWritable>
{
//initialize the writable datatype variables
private final static DoubleWritable tempWritable = new DoubleWritable(0);
private Text ReleaseYear = new Text();
//Override the original map methods
@Override
//map takes in three parameters
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException
{
//creating an array called line of type String
//Split the values in the fields and store each value in one array element
String[] line = value.toString().split("\t");
//create a variable ID with type String and store the the 4th element (index 3) from array line. This contains the Year in th data file
String Year = line[3];
//Set the value of the year object created from the Text class to be the value of the Year read from the data file
ReleaseYear.set(Year);
//Create a variable, temp, of type double and convert the value stored in the 15th element (14th index) of the line array from String to Double and store it in temp
double temp = Double.parseDouble(line[14].trim());
//Store temp in tempWritable
tempWritable.set(temp);
//Emit Year and the average rating in tempWritable to the Reducer class
context.write(ReleaseYear, tempWritable);
}
}
Reducer code:
public class AverageRatingReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable>
{
//Create an arraylist of type double called ratingList
ArrayList<Double> ratingList = new ArrayList<Double>();
//Override reduce method
@Override
//reduce takes in thre parameters
public void reduce(Text key, Iterable<DoubleWritable> values, Context context)
throws IOException, InterruptedException
{
//create a variable SumofRatings of type double and initialize it to 0
double SumofRating = 0.0;
//use a for loop to store ratings in the ratingList array and sum the ratings in the SumofRatings variable
for(DoubleWritable value : values)
{
ratingList.add(value.get());
//calculate the cumulative sum
SumofRating = SumofRating + value.get();
}
//get the number of rating in the arrayList
int size = ratingList.size();
//calculate the average rating
double averageRating = SumofRating/size;
//Emit the year and the average rating to the output file
context.write(key, new DoubleWritable(averageRating));
}
}
And the main class:
public class AverageRating
{
public static void main(String[] args) throws Exception
{
//Create an object, conf, from the configuration class
Configuration conf = new Configuration();
if (args.length != 3)
{
System.err.println("Usage: MeanTemperature <input path> <output path>");
System.exit(-1);
}
//create an object, job, from the Job class
Job job;
//configure the parameters for the job
job = Job.getInstance(conf, "Average Rating");
//specify the driver class in the JAR file
job.setJarByClass(AverageRating.class);
//setting the input and output paths for the job
FileInputFormat.addInputPath(job, new Path(args[1]));
FileOutputFormat.setOutputPath(job, new Path(args[2]));
//Set the mapper and reducer for the job
job.setMapperClass(AverageRatingMapper.class);
job.setReducerClass(AverageRatingReducer.class);
//Set the key class (Text) and value class (DoubleWritable) for te job output data
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
//Delete output if it exists
FileSystem hdfs = FileSystem.get(conf);
Path outputDir = new Path(args[2]);
if(hdfs.exists(outputDir))
{
hdfs.delete(outputDir, true);
}
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Sample of Data as requested:
imdbtitle | title | year | avg_rating |
---|---|---|---|
tt0000009 | XYZ | 1911 | 6.9 |
tt0001892 | PQR | 1912 | 6.2 |
tt0002154 | ABC | 1912 | 8.2 |
tt0000458 | JKL | 1913 | 6.3 |
tt0015263 | TGH | 1913 | 7.1 |
tt0000053 | PLO | 1912 | 4.9 |
NOTE: there are many more columns. I have just added the important ones
The result is displayed right for the first year but the second and third years are completely inaccurate.
Could someone please help me out!
You don't really need an ArrayList
in the Reduce function, because each reducer gets all the values grouped by the given key (so in this case, each reducer has all the ratings for a single year).
Additionally, declaring the ArrayList
outside of the Reduce function body kind of begs for trouble, since you are using this list just to count the number of the values that are on the reducer already. My guess is that between different reducer scans, the list keeps on getting populated with ratings from the ratings of the next key (aka year, here). So the first key-value pair is correct, but everything after it is not because the number of elements in the list keeps increasing.
You can keep it more traditional, by having a simple int
variable named numOfRatings
that simply counts the number of values (aka ratings given for a specific year here), and use that variable to divide in order to find the average rating for each year.
public static class AverageRatingReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable>
{
public void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException
{
// create a variable SumofRatings of type double and initialize it to 0
double sumOfRating = 0.0;
int numOfRatings = 0;
// use a for loop to store ratings in the ratingList array and sum the ratings in the SumofRatings variable
for(DoubleWritable value : values)
{
sumOfRating += value.get();
numOfRatings++;
}
// calculate the average rating
double averageRating = sumOfRating/numOfRatings;
// emit the year and the average rating to the output file
context.write(key, new DoubleWritable(averageRating));
}
}