Search code examples
hadoopbigdatasecondary-sort

Hadoop Secondary sort - to use or not use


I have accidents input data from Traffic Data Analysis . Some of the columns are :

Accident Id, Accident Date, Day of week

1, 1/1/1979, 5 (Thursday)

2, 1/2/1979, 6 (Friday)

.......

3, 1/1/1980, 0 (Sunday)

I am trying to solve following :

Find number of accidents per year per day

so output should look like :

where Key is (Year, Day of week)

and Value= Number of accidents on that day Here line 1 represents , year =1979 Day = Sunday and number of accidents =500 and so on.

1979,1     500

1979,2    1500

1979,3    2500

1979,4    3500

1979,5    4500

1979,6    5500

1979,7    6500

1980,1     500

1980,2    1500

1980,3    2500

1980,4    3500

1980,5    4500

In this scenario , I am trying to solve it using secondary sort method . Is that correct way to solve this problem ?

If secondary sort is correct way , its not working for me . Here is the key class, mapper and reducer. But my output doesn't come as expected . Please help ..

public class DOW implements WritableComparable<DOW> {
    private Text year;
    private Text day;

    // private final Text count;

    // private int count;
    public DOW() {
        this.year = new Text();
        this.day = new Text();
        // this.count = count;
    }

    public DOW(Text year, Text day) {
        this.year = year;
        this.day = day;
        // this.count = count;
    }

    public Text getYear() {
        return this.year;
    }

    public void setYear(Text year) {
        this.year = year;
    }

    public Text getDay() {
        return this.day;
    }

    public void setDay(Text day) {
        this.day = day;
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        // TODO Auto-generated method stub
        year.readFields(in);
        day.readFields(in);

    }

    @Override
    public void write(DataOutput out) throws IOException {
        // TODO Auto-generated method stub
        year.write(out);
        day.write(out);
    }

    @Override
    public int compareTo(DOW o) {
        // TODO Auto-generated method stub
        int cmp = year.compareTo(o.year);
        if (cmp != 0) {
            return cmp;
        }
        return o.day.compareTo(this.day);
    }

    @Override
    public String toString() {
        // TODO Auto-generated method stub
        return year + "," + day;
    }

    @Override
    public boolean equals(Object o) {
        // TODO Auto-generated method stub
        if (o instanceof DOW) {
            DOW tp = (DOW) o;
            return year.equals(tp.year) && day.equals(tp.day);
        }
        return false;
    }

    @Override
    public int hashCode() {
        // TODO Auto-generated method stub
        return year.hashCode() * 163 + day.hashCode();
    }
}
public class AccidentDowDemo extends Configured implements Tool {

    public static class DOWMapper extends Mapper<LongWritable, Text, DOW, IntWritable> {
        private static final Logger sLogger = Logger.getLogger(DOWMapper.class);

        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws java.io.IOException, InterruptedException {

            if (value.toString().contains(",")) {
                String[] array = value.toString().split(",");
                if (!array[9].equals("Date")) {
                    Date dt = null;
                    try {
                        dt = new SimpleDateFormat("dd/mm/yyyy").parse(array[9]);

                    } catch (ParseException e) {
                        // TODO Auto-generated catch block

                        e.printStackTrace();
                    }

                    int year = dt.getYear();

                    int day = Integer.parseInt(array[10].toString());
                                        context.write(new DOW(new Text(Integer.toString(year)),
                            new Text(Integer.toString(day))),
                            new IntWritable(1));
                }
            }
        };
    }

    public static class DOWReducer extends Reducer<DOW, IntWritable, DOW, IntWritable> {
        private static final Logger sLogger = Logger
                .getLogger(DOWReducer.class);

        @Override
        protected void reduce(DOW key, Iterable<IntWritable> values,
                Context context) throws java.io.IOException,
                InterruptedException {
            int count = 0;
            sLogger.info("key =" + key);
            for (IntWritable x : values) {
                int val = Integer.parseInt(x.toString());
                count = count + val;
            }
            context.write(key, new IntWritable(count));
        };
    }

    public static class FirstPartitioner extends Partitioner<DOW, IntWritable> {

        @Override
        public int getPartition(DOW key, IntWritable value, int numPartitions) {
            // TODO Auto-generated method stub

            return Math.abs(Integer.parseInt(key.getYear().toString()) * 127)
                    % numPartitions;
        }
    }

    public static class KeyComparator extends WritableComparator {
        protected KeyComparator() {
            super(DOW.class, true);
        }

        @Override
        public int compare(WritableComparable w1, WritableComparable w2) {
            // TODO Auto-generated method stub

            DOW ip1 = (DOW) w1;
            DOW ip2 = (DOW) w2;
            int cmp = ip1.getYear().compareTo(ip2.getYear());
            if (cmp == 0) {
                cmp = -1 * ip1.getDay().compareTo(ip2.getDay());
            }
            return cmp;
        }
    }

    public static class GroupComparator extends WritableComparator {
        protected GroupComparator() {
            super(DOW.class, true);
        }

        @Override
        public int compare(WritableComparable w1, WritableComparable w2) {

            // TODO Auto-generated method stub
            DOW ip1 = (DOW) w1;
            DOW ip2 = (DOW) w2;
            return ip1.getYear().compareTo(ip2.getYear());
        }
    }
}

Solution

  • If you need to basically simulate

    select year, day, count(*) as totalPerDay from DATA group by year, day
    

    than you do not need secondary sort.

    But if you need to produce something like a CUBE, where you need to calculate total per year and total per week in one MR job, than secondary sort is the way to go.