I am trying to practice Big Data Mapreduce by making Movie recommendation System . My code:
*imports
public class MRS {
public static class Map extends Mapper<LongWritable, Text, Text, Text> {
public void map(LongWritable key, Text value, Context con)
throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer token = new StringTokenizer(line);
while(token.hasMoreTokens()){
String userId = token.nextToken();
String movieId = token.nextToken();
String ratings =token.nextToken();
token.nextToken();
con.write(new Text(userId), new Text(movieId + "," + ratings));
}
}
}
public static class Reduce extends
Reducer<Text, IntWritable, Text, Text> {
public void reduce(Text key, Iterable<Text> value,Context con ) throws IOException, InterruptedException{
int item_count=0;
int item_sum =0;
String result="[";
for(Text t : value){
String s = t.toString();
StringTokenizer token = new StringTokenizer(s,",");
while(token.hasMoreTokens()){
token.nextToken();
item_sum=item_sum+Integer.parseInt(token.nextToken());
item_count++;
}
result=result+"("+s+"),";
}
result=result.substring(0, result.length()-1);
result=result+"]";
result=String.valueOf(item_count)+","+String.valueOf(item_sum)+","+result;
con.write(key, new Text(result));
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration con = new Configuration();
Job job = new Job(con,"Movie Recommendation");
job.setJarByClass(MRS.class);
job.setMapperClass(Map.class);
job.setCombinerClass(Reduce.class);
job.setReducerClass(Reduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
I am using the movielens dataset from here
Of which input file is u.data
and my output after running this code should be like
userId Item_count,Item_sum,[list of movie_Id with rating]
However, I am getting this
99 173,4
99 288,4
99 66,3
99 203,4
99 105,2
99 12,5
99 1,4
99 741,3
99 895,3
99 619,4
99 742,5
99 294,4
99 196,4
99 328,4
99 120,2
99 246,3
99 232,4
99 181,5
99 201,3
99 978,3
99 123,3
99 433,4
99 345,3
This should be the output of the Map class
I made few adjustment to the code and it is giving me the exact expected result . Here is my new code
imports*
public class MRS {
public static class Map extends
Mapper<LongWritable, Text, IntWritable, Text> {
public void map(LongWritable key, Text value, Context con)
throws IOException, InterruptedException {
String line = value.toString();
String[] s = line.split("\t");
StringTokenizer token = new StringTokenizer(line);
while (token.hasMoreTokens()) {
IntWritable userId = new IntWritable(Integer.parseInt(token
.nextToken()));
String movieId = token.nextToken();
String ratings = token.nextToken();
token.nextToken();
con.write(userId, new Text(movieId + "," + ratings));
}
}
}
public static class Reduce extends
Reducer<IntWritable, Text, IntWritable, Text> {
public void reduce(IntWritable key, Iterable<Text> value, Context con)
throws IOException, InterruptedException {
int item_count = 0;
int item_sum = 0;
String result = "";
for (Text t : value) {
String s = t.toString();
StringTokenizer token = new StringTokenizer(s, ",");
result = result + "[" + s + "],";
}
result = result.substring(1, result.length() - 2);
System.out.println(result);
con.write(key, new Text(result));
}
}
public static void main(String[] args) throws IOException,
ClassNotFoundException, InterruptedException {
Configuration con = new Configuration();
Job job = new Job(con, "Movie Recommendation");
job.setJarByClass(MRS.class);
job.setMapperClass(Map.class);
job.setCombinerClass(Reduce.class);
job.setReducerClass(Reduce.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
what I change is Driver code
job.setOutputKeyClass(IntWritable.class);
Mapper code
Mapper<LongWritable, Text, IntWritable, Text>
Reducer code
public static class Reduce extends
Reducer<Text, IntWritable, Text, Text> {
public void reduce(Text key, Iterable<Text> value,Context con ) throws
IOException, InterruptedException{
I think the problem was that the outputkey and outputvalue data is matching the mapper class thats why it is printing mapper and not even executng reducer
Correct me if I am wrong.