Search code examples
javahadoopmapreduce

Transfering Value To Cleanup Function of the Reducer Not Working


I don't understand what my MapReduce job gives me as output. I have a .csv file as input where districts of a city are stored with the age of each tree for each district.

In the combiner I try to get the oldest tree per district, while in my reducer I try to retrieve the district with the oldest tree in the city.

My problem is that while the reduce funciton gives me output values of 11, 12, 16, and 5, the cleanup function inside the reducer that should return the last value of those (5) actually returns 9 (which is the last value that my reducer analyses).

I don't get what i missed.

Below is what I tried so far.

Mapper:

package com.opstty.mapper;

import org.apache.commons.io.output.NullWriter;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
import java.util.StringTokenizer;

public class TokenizerMapper_1_8_6 extends Mapper<Object, Text, Text, IntWritable> {
    private final static IntWritable one = new IntWritable(1);
    private Text result = new Text();

    public void map(Object key, Text value, Context context)
            throws IOException, InterruptedException {

        StringTokenizer itr = new StringTokenizer(value.toString(),";");
        int i = 0;
        while (itr.hasMoreTokens()) {
            String arrondissement = itr.nextToken();
            if(i%13==1 && !arrondissement.toString().equals("ARRONDISSEMENT")) {

                itr.nextToken();itr.nextToken();itr.nextToken();
                String annee = itr.nextToken();
                result.set(arrondissement);

                if(Double.parseDouble((String.valueOf(annee))) > 1000){
                    context.write(result, new IntWritable((int) Double.parseDouble((String.valueOf(annee)))));
                    i+=3;
                }
            }
            i++;
        }
    }
}

Combiner:

package com.opstty.job;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

public class Compare extends Reducer<Text, IntWritable, Text, IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {
        List a = new ArrayList();
        int sum = 0;
        for (IntWritable val : values) {
            a.add(val.get());
        }
        Collections.sort(a);
        result.set((Integer) Collections.min(a));
        context.write(key, result);
    }
}

Reducer:

public class IntReducer6 extends Reducer<Text, IntWritable, Text, NullWritable> {
    private int max = 100000;
    private int annee=0;
    int i =0;
    private  List a = new ArrayList();

    public void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {


        for (IntWritable value : values)
        {
            annee = value.get();
        }

        if(annee < max)
        {
            a.add(key);
            max = annee;
            context.write((Text) a.get(i), NullWritable.get());
            i++;
        }
    }

    @Override
    // only display the character which has the largest value
    protected void cleanup(Context context) throws IOException, InterruptedException {
        context.write((Text) a.get(a.size()-1), NullWritable.get());
    }
}

Solution

  • Your approach at the reduce (and combiner-reduce) methods is a bit of an overkill for such a simple type as far as MapReduce jobs go. To be perfectly honest this type of task doesn't seem to need a combiner at all, and certainly cannot use a cleanup function for the reducers since it is executed for each one of them.

    The main issue with your program is that there's no consideration about the operation of the reduce function, since the latter is then executed through a number of its instances for each key value, or on more simple terms the reduce function is called for every key separately. This means that for your type of job your reduce function needs to be executed only once (for all the "keys", as we are going to see how that turns out below) in order to find the district with the oldest tree.

    Having that in mind, the map function should arrange the data of each row of the input .csv file in such a way where the key of each key-value pair is the same for every single one of the pairs (in order to have the reduce function operate on all of the rows) and the value of each pair holds the name of the district and the age of each tree. So the mappers will generate key-value pairs were the NULL value is going to be key for all of them, and each value will be a composite value where the district name and a particular tree age are stored, like so:

    <NULL, (district, tree_age)>

    As for the reduce function, it only needs to scan every value based on the NULL key (aka all of the pairs) and find the max tree age. Then, the final output key-value pair is gonna show the district with the oldest tree and the max tree age, like so:

    <district_with_oldest_tree, max_tree_age>

    To showcase my tested answer I took some liberties to simplify your program, mainly because the french(?)-named variables are kind of confusing to me and you overcomplicated things by using strictly Hadoop-friendly data structures like StringTokenizer when the more recent Hadoop releases are supporting more common Java datatypes.

    First, since I don't have a look at your input .csv file, I created mine trees.csv stored in a directory named trees that has the following lines in it, with columns for districts and a tree's age:

    District A; 7
    District B; 20
    District C; 10
    District C; 1
    District B; 17
    District A; 6
    District A; 11
    District B; 18
    District C; 2
    

    In my (all-put-in-one-file-for-the-sake-of-simplicity) program, the @ character is used as a delimiter to separate the data on the composite keys generated by the mappers, and the results are stored in a directory named oldest_tree. You can change this according to your needs or your own .csv input file.

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.FileStatus;
    import org.apache.hadoop.fs.FSDataOutputStream;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    
    import java.io.*;
    import java.io.IOException;
    import java.util.*;
    import java.nio.charset.StandardCharsets;
    
    public class OldestTree
    {
        /* input:  <byte_offset, line_of_dataset>
         * output: <NULL, (district, tree_age)>
         */
        public static class Map extends Mapper<Object, Text, NullWritable, Text> 
        {
            public void map(Object key, Text value, Context context) throws IOException, InterruptedException 
            {
                String row = value.toString();
    
                String[] columns = row.split("; ");     // split each row by the delimiter
                String district_name = columns[0];
                String tree_age = columns[1];
    
                // set NULL as key for the generated key-value pairs aimed at the reducers
                // and set the district with each of its trees age as a composite value,
                // with the '@' character as a delimiter
                context.write(NullWritable.get(), new Text(district_name + '@' + tree_age));
            }
        }
    
        /* input: <NULL, (district, tree_age)>
         * output: <district_with_oldest_tree, max_tree_age>
         */
        public static class Reduce extends Reducer<NullWritable, Text, Text, IntWritable>
        {
            public void reduce(NullWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException 
            {
                String district_with_oldest_tree = "";
                int max_tree_age = 0;
    
                // for all the values with the same (NULL) key,
                // aka all the key-value pairs...
                for(Text value : values)
                {
                    // split the composite value by the '@' delimiter
                    String[] splitted_values = value.toString().split("@");
                    String district_name = splitted_values[0];
                    int tree_age = Integer.parseInt(splitted_values[1]);
    
                    // find the district with the oldest tree
                    if(tree_age > max_tree_age)
                    {
                        district_with_oldest_tree = district_name;
                        max_tree_age = tree_age;
                    }
                }
    
                // output the district (key) with the oldest tree's year of planting (value)
                // to the output directory
                context.write(new Text(district_with_oldest_tree), new IntWritable(max_tree_age));
            }
        }
    
        public static void main(String[] args) throws Exception
        {
            // set the paths of the input and output directories in the HDFS
            Path input_dir = new Path("trees");
            Path output_dir = new Path("oldest_tree");
    
            // in case the output directory already exists, delete it
            Configuration conf = new Configuration();
            FileSystem fs = FileSystem.get(conf);
            if(fs.exists(output_dir))
                fs.delete(output_dir, true);
    
            // configure the MapReduce job
            Job oldesttree_job = Job.getInstance(conf, "Oldest Tree");
            oldesttree_job.setJarByClass(OldestTree.class);
            oldesttree_job.setMapperClass(Map.class);
            oldesttree_job.setReducerClass(Reduce.class);    
            oldesttree_job.setMapOutputKeyClass(NullWritable.class);
            oldesttree_job.setMapOutputValueClass(Text.class);
            oldesttree_job.setOutputKeyClass(Text.class);
            oldesttree_job.setOutputValueClass(IntWritable.class);
            FileInputFormat.addInputPath(oldesttree_job, input_dir);
            FileOutputFormat.setOutputPath(oldesttree_job, output_dir);
            oldesttree_job.waitForCompletion(true);
        }
    }
    

    So the result of the program stored in the oldest_tree directory (as seen through the Hadoop HDFS browser) is:

    enter image description here