I don't understand what my MapReduce job gives me as output. I have a .csv
file as input where districts of a city are stored with the age of each tree for each district.
In the combiner I try to get the oldest tree per district, while in my reducer I try to retrieve the district with the oldest tree in the city.
My problem is that while the reduce
funciton gives me output values of 11, 12, 16, and 5, the cleanup
function inside the reducer that should return the last value of those (5) actually returns 9 (which is the last value that my reducer analyses).
I don't get what i missed.
Below is what I tried so far.
Mapper:
package com.opstty.mapper;
import org.apache.commons.io.output.NullWriter;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
import java.util.StringTokenizer;
public class TokenizerMapper_1_8_6 extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text result = new Text();
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString(),";");
int i = 0;
while (itr.hasMoreTokens()) {
String arrondissement = itr.nextToken();
if(i%13==1 && !arrondissement.toString().equals("ARRONDISSEMENT")) {
itr.nextToken();itr.nextToken();itr.nextToken();
String annee = itr.nextToken();
result.set(arrondissement);
if(Double.parseDouble((String.valueOf(annee))) > 1000){
context.write(result, new IntWritable((int) Double.parseDouble((String.valueOf(annee)))));
i+=3;
}
}
i++;
}
}
}
Combiner:
package com.opstty.job;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
public class Compare extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
List a = new ArrayList();
int sum = 0;
for (IntWritable val : values) {
a.add(val.get());
}
Collections.sort(a);
result.set((Integer) Collections.min(a));
context.write(key, result);
}
}
Reducer:
public class IntReducer6 extends Reducer<Text, IntWritable, Text, NullWritable> {
private int max = 100000;
private int annee=0;
int i =0;
private List a = new ArrayList();
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
for (IntWritable value : values)
{
annee = value.get();
}
if(annee < max)
{
a.add(key);
max = annee;
context.write((Text) a.get(i), NullWritable.get());
i++;
}
}
@Override
// only display the character which has the largest value
protected void cleanup(Context context) throws IOException, InterruptedException {
context.write((Text) a.get(a.size()-1), NullWritable.get());
}
}
Your approach at the reduce
(and combiner-reduce
) methods is a bit of an overkill for such a simple type as far as MapReduce jobs go. To be perfectly honest this type of task doesn't seem to need a combiner at all, and certainly cannot use a cleanup function for the reducers since it is executed for each one of them.
The main issue with your program is that there's no consideration about the operation of the reduce
function, since the latter is then executed through a number of its instances for each key
value, or on more simple terms the reduce
function is called for every key
separately. This means that for your type of job your reduce
function needs to be executed only once (for all the "keys", as we are going to see how that turns out below) in order to find the district with the oldest tree.
Having that in mind, the map
function should arrange the data of each row of the input .csv
file in such a way where the key
of each key-value
pair is the same for every single one of the pairs (in order to have the reduce
function operate on all of the rows) and the value of each pair holds the name of the district and the age of each tree. So the mappers will generate key-value
pairs were the NULL
value is going to be key for all of them, and each value will be a composite value where the district name and a particular tree age are stored, like so:
<NULL, (district, tree_age)>
As for the reduce
function, it only needs to scan every value based on the NULL
key (aka all of the pairs) and find the max tree age. Then, the final output key-value
pair is gonna show the district with the oldest tree and the max tree age, like so:
<district_with_oldest_tree, max_tree_age>
To showcase my tested answer I took some liberties to simplify your program, mainly because the french(?)-named variables are kind of confusing to me and you overcomplicated things by using strictly Hadoop-friendly data structures like StringTokenizer
when the more recent Hadoop releases are supporting more common Java datatypes.
First, since I don't have a look at your input .csv
file, I created mine trees.csv
stored in a directory named trees
that has the following lines in it, with columns for districts and a tree's age:
District A; 7
District B; 20
District C; 10
District C; 1
District B; 17
District A; 6
District A; 11
District B; 18
District C; 2
In my (all-put-in-one-file-for-the-sake-of-simplicity) program, the @
character is used as a delimiter to separate the data on the composite keys generated by the mappers, and the results are stored in a directory named oldest_tree
. You can change this according to your needs or your own .csv
input file.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.*;
import java.io.IOException;
import java.util.*;
import java.nio.charset.StandardCharsets;
public class OldestTree
{
/* input: <byte_offset, line_of_dataset>
* output: <NULL, (district, tree_age)>
*/
public static class Map extends Mapper<Object, Text, NullWritable, Text>
{
public void map(Object key, Text value, Context context) throws IOException, InterruptedException
{
String row = value.toString();
String[] columns = row.split("; "); // split each row by the delimiter
String district_name = columns[0];
String tree_age = columns[1];
// set NULL as key for the generated key-value pairs aimed at the reducers
// and set the district with each of its trees age as a composite value,
// with the '@' character as a delimiter
context.write(NullWritable.get(), new Text(district_name + '@' + tree_age));
}
}
/* input: <NULL, (district, tree_age)>
* output: <district_with_oldest_tree, max_tree_age>
*/
public static class Reduce extends Reducer<NullWritable, Text, Text, IntWritable>
{
public void reduce(NullWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException
{
String district_with_oldest_tree = "";
int max_tree_age = 0;
// for all the values with the same (NULL) key,
// aka all the key-value pairs...
for(Text value : values)
{
// split the composite value by the '@' delimiter
String[] splitted_values = value.toString().split("@");
String district_name = splitted_values[0];
int tree_age = Integer.parseInt(splitted_values[1]);
// find the district with the oldest tree
if(tree_age > max_tree_age)
{
district_with_oldest_tree = district_name;
max_tree_age = tree_age;
}
}
// output the district (key) with the oldest tree's year of planting (value)
// to the output directory
context.write(new Text(district_with_oldest_tree), new IntWritable(max_tree_age));
}
}
public static void main(String[] args) throws Exception
{
// set the paths of the input and output directories in the HDFS
Path input_dir = new Path("trees");
Path output_dir = new Path("oldest_tree");
// in case the output directory already exists, delete it
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
if(fs.exists(output_dir))
fs.delete(output_dir, true);
// configure the MapReduce job
Job oldesttree_job = Job.getInstance(conf, "Oldest Tree");
oldesttree_job.setJarByClass(OldestTree.class);
oldesttree_job.setMapperClass(Map.class);
oldesttree_job.setReducerClass(Reduce.class);
oldesttree_job.setMapOutputKeyClass(NullWritable.class);
oldesttree_job.setMapOutputValueClass(Text.class);
oldesttree_job.setOutputKeyClass(Text.class);
oldesttree_job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(oldesttree_job, input_dir);
FileOutputFormat.setOutputPath(oldesttree_job, output_dir);
oldesttree_job.waitForCompletion(true);
}
}
So the result of the program stored in the oldest_tree
directory (as seen through the Hadoop HDFS browser) is: