Search code examples
javahadoopmapreducehbasereduce-reduce-conflict

How to solve casting error from IntWritable to Mutation? Map Reduce HBase


I have the following error when I try to pass an IntWritable from my mapper to my reducer:

INFO mapreduce.Job: Task Id : attempt_1413976354988_0009_r_000000_1, Status : FAILED
Error: java.lang.ClassCastException: org.apache.hadoop.io.IntWritable cannot be cast to org.apache.hadoop.hbase.client.Mutation

This is my mapper:

public class testMapper extends TableMapper<Object, Object>
{

    public void map(ImmutableBytesWritable rowKey, Result columns, Context context) throws IOException, InterruptedException
    {

        try
        {
            // get rowKey and convert it to string
            String inKey = new String(rowKey.get());
            // set new key having only date
            String oKey = inKey.split("#")[0];
            // get sales column in byte format first and then convert it to
            // string (as it is stored as string from hbase shell)
            byte[] bSales = columns.getValue(Bytes.toBytes("cf1"), Bytes.toBytes("sales"));
            String sSales = new String(bSales);
            Integer sales = new Integer(sSales);
            // emit date and sales values
            context.write(new ImmutableBytesWritable(oKey.getBytes()), new IntWritable(sales));

        }

This is the reducer:

public class testReducer extends TableReducer<Object, Object, Object>
{

    public void reduce(ImmutableBytesWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
    {
        try
        {

            int sum = 0;
            // loop through different sales vales and add it to sum
            for (IntWritable sales : values)
            {
                Integer intSales = new Integer(sales.toString());
                sum += intSales;
            }

            // create hbase put with rowkey as date

            Put insHBase = new Put(key.get());
            // insert sum value to hbase
            insHBase.add(Bytes.toBytes("cf1"), Bytes.toBytes("sum"), Bytes.toBytes(sum));
            // write data to Hbase table
            context.write(null, insHBase);

and the driver:

public class testDriver
{
    public static void main(String[] args) throws Exception
    {
        Configuration conf = new Configuration();

        // define scan and define column families to scan
        Scan scan = new Scan();
        scan.addFamily(Bytes.toBytes("cf1"));

        Job job = Job.getInstance(conf);
        job.setJarByClass(testDriver.class);

        // define input hbase table
        TableMapReduceUtil.initTableMapperJob("test1", scan, testMapper.class, ImmutableBytesWritable.class, IntWritable.class, job);
        // define output table
        TableMapReduceUtil.initTableReducerJob("test2", testReducer.class, job);

        job.waitForCompletion(true);
    }
}

Solution

  • context.write(null, insHBase);

    The problem is that you are writing the Put out to the context and hbase is expecting an IntWritable.

    You should write the outputs out to the context and let Hbase take charge of storing them. Hase is expecting to store an IntWritable but you are handing it a Put operation which extends Mutation.

    The workflow for Hbase is you would configure where to put the outputs in your Configuration and then simply write the outputs out to the context. You shouldn't have to do any manual Put operations in your reducer.