Search code examples
hadoopmapreducepartitioner

Partitioner is not working correctly


I am trying to code one MapReduce scenario in which i have created some User ClickStream data in the form of JSON. After that i have written Mapper class to fetch the required data from the file my mapper code is :-

private final static String URL = "u";

private final static String Country_Code = "c";

private final static String Known_User = "nk";

private final static String Session_Start_time = "hc";

private final static String User_Id = "user";

private final static String Event_Id = "event";

public void map(LongWritable key, Text value, Context context)
        throws IOException, InterruptedException {
    String aJSONRecord = value.toString();
    try {
        JSONObject aJSONObject = new JSONObject(aJSONRecord);
        StringBuilder aOutputString = new StringBuilder();
        aOutputString.append(aJSONObject.get(User_Id).toString()+",");
        aOutputString.append(aJSONObject.get(Event_Id).toString()+",");
        aOutputString.append(aJSONObject.get(URL).toString()+",");
        aOutputString.append(aJSONObject.get(Known_User)+",");
        aOutputString.append(aJSONObject.get(Session_Start_time)+",");
        aOutputString.append(aJSONObject.get(Country_Code)+",");
        context.write(new Text(aOutputString.toString()), key);
        System.out.println(aOutputString.toString());
    } catch (JSONException e) {
        e.printStackTrace();
    }
}

}

And my reducer code is :-

public void reduce(Text key, Iterable<LongWritable> values,
        Context context) throws IOException, InterruptedException {
        String aString =  key.toString();
        context.write(new Text(aString.trim()), new Text(""));  

}

And my partitioner code is :-

public int getPartition(Text key, LongWritable value, int numPartitions) {
    String aRecord = key.toString();
    if(aRecord.contains(Country_code_Us)){
        return 0;
    }else{
        return 1;
    }
}

And here is my driver code

public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "Click Stream Analyzer");
    job.setNumReduceTasks(2);
    job.setJarByClass(ClickStreamDriver.class);
    job.setMapperClass(ClickStreamMapper.class);
    job.setReducerClass(ClickStreamReducer.class);
    job.setPartitionerClass(ClickStreamPartitioner.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(LongWritable.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);

}

Here i am trying to partition my data on the basis of country code. But its not working, it is sending each and every record in a single reducer file i think file other then the one created for US reduce.

One more thing when i see the output of mappers it shows some extra space added at the end of each record.

Please suggest if i am making any mistake here.


Solution

  • I have used NullWritable and it works. Now i can see records are getting partitioned in different files. Since i was using longwritable as a null value instead of null writable , space is added in the last of each line and due to this US was listed as "US " and partition was not able to divide the orders.