I have the following error when I try to pass an IntWritable
from my mapper to my reducer:
INFO mapreduce.Job: Task Id : attempt_1413976354988_0009_r_000000_1, Status : FAILED
Error: java.lang.ClassCastException: org.apache.hadoop.io.IntWritable cannot be cast to org.apache.hadoop.hbase.client.Mutation
This is my mapper:
public class testMapper extends TableMapper<Object, Object>
{
public void map(ImmutableBytesWritable rowKey, Result columns, Context context) throws IOException, InterruptedException
{
try
{
// get rowKey and convert it to string
String inKey = new String(rowKey.get());
// set new key having only date
String oKey = inKey.split("#")[0];
// get sales column in byte format first and then convert it to
// string (as it is stored as string from hbase shell)
byte[] bSales = columns.getValue(Bytes.toBytes("cf1"), Bytes.toBytes("sales"));
String sSales = new String(bSales);
Integer sales = new Integer(sSales);
// emit date and sales values
context.write(new ImmutableBytesWritable(oKey.getBytes()), new IntWritable(sales));
}
This is the reducer:
public class testReducer extends TableReducer<Object, Object, Object>
{
public void reduce(ImmutableBytesWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
{
try
{
int sum = 0;
// loop through different sales vales and add it to sum
for (IntWritable sales : values)
{
Integer intSales = new Integer(sales.toString());
sum += intSales;
}
// create hbase put with rowkey as date
Put insHBase = new Put(key.get());
// insert sum value to hbase
insHBase.add(Bytes.toBytes("cf1"), Bytes.toBytes("sum"), Bytes.toBytes(sum));
// write data to Hbase table
context.write(null, insHBase);
and the driver:
public class testDriver
{
public static void main(String[] args) throws Exception
{
Configuration conf = new Configuration();
// define scan and define column families to scan
Scan scan = new Scan();
scan.addFamily(Bytes.toBytes("cf1"));
Job job = Job.getInstance(conf);
job.setJarByClass(testDriver.class);
// define input hbase table
TableMapReduceUtil.initTableMapperJob("test1", scan, testMapper.class, ImmutableBytesWritable.class, IntWritable.class, job);
// define output table
TableMapReduceUtil.initTableReducerJob("test2", testReducer.class, job);
job.waitForCompletion(true);
}
}
context.write(null, insHBase);
The problem is that you are writing the Put out to the context and hbase is expecting an IntWritable.
You should write the outputs out to the context and let Hbase take charge of storing them. Hase is expecting to store an IntWritable but you are handing it a Put operation which extends Mutation.
The workflow for Hbase is you would configure where to put the outputs in your Configuration and then simply write the outputs out to the context. You shouldn't have to do any manual Put operations in your reducer.