Search code examples
javahadoopmapreducehbase

Hbase Bulkload append Data instead overwrite them


acutally I'am loading Data into Hbase with the help of Mapreduce and Bulkload, which I implemented in Java. So basically I created a Mapper and use HFileOutputFormat2.configureIncrementalLoad (full code at the end of the question) for reduce and i use a mapper which simply reads in some bytes from file and create a put. Writing this out an using LoadIncrementalHFiles.doBulkLoad to write the Data into Hbase. This all works pretty well. But for sure when do this it overwrites old values in Hbase. So I'am searching for a way to append the Data, like the append function from api works. Thanks for reading and hopefully some of you have an Idea that can help me :)

public int run(String[] args) throws Exception {
    int result=0;
    String outputPath = args[1];
    Configuration configuration = getConf();
    configuration.set("data.seperator", DATA_SEPERATOR);
    configuration.set("hbase.table.name",TABLE_NAME);
    configuration.set("COLUMN_FAMILY_1",COLUMN_FAMILY_1);
    configuration.set("COLUMN_FAMILY_2",COLUMN_FAMILY_2);

    Job job = Job.getInstance(configuration);
    job.setJarByClass(HBaseBulkLoadDriver.class);
    job.setJobName("Bulk Loading HBase Table::"+TABLE_NAME);
    job.setInputFormatClass(TextInputFormat.class);
    job.setMapOutputKeyClass(ImmutableBytesWritable.class);
    job.setMapperClass(HBaseBulkLoadMapper.class);

    FileInputFormat.addInputPaths(job, args[0]);
    FileSystem.getLocal(getConf()).delete(new Path(outputPath), true);
    HFileOutputFormat2.setOutputPath(job,new Path((outputPath)));
    job.setMapOutputValueClass(Put.class);
    Connection c = ConnectionFactory.createConnection(configuration);
    Table t = c.getTable(TableName.valueOf(TABLE_NAME));
    RegionLocator rl = c.getRegionLocator(TableName.valueOf(TABLE_NAME));
    HFileOutputFormat2.configureIncrementalLoad(job,t,rl);
    System.out.println("start");
    job.waitForCompletion(true);
    if (job.isSuccessful()) {
        HBaseBulkLoad.doBulkLoad(outputPath, TABLE_NAME);
    } else {

        result = -1;
    }
    return result;
}



public static void doBulkLoad(String pathToHFile, String tableName) {
    try {
        Configuration configuration = new Configuration();
        configuration.set("mapreduce.child.java.opts", "-Xmx1g");
        HBaseConfiguration.addHbaseResources(configuration);
        LoadIncrementalHFiles loadFfiles = new LoadIncrementalHFiles(configuration);


        //HTable hTable = new HTable(configuration, tableName);
        //loadFfiles.doBulkLoad(new Path(pathToHFile), hTable);

        Connection connection = ConnectionFactory.createConnection(configuration);
        Table table = connection.getTable(TableName.valueOf(tableName));
        Admin admin = connection.getAdmin();
        RegionLocator regionLocator = connection.getRegionLocator(TableName.valueOf(tableName));
        //path, admin, table, region locator
        loadFfiles.doBulkLoad(new Path(pathToHFile),admin,table,regionLocator);


        System.out.println("Bulk Load Completed..");
    } catch(Exception exception) {
        exception.printStackTrace();
    }

As requested in the comments, i add here the output of the table description, cause the table was created by the python happybase api and i'am not sure what optionflags the api is setting by default...

{NAME => '0', BLOOMFILTER => 'NONE', VERSIONS => '3', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_B LOCK_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'false', BLO CKSIZE => '65536', REPLICATION_SCOPE => '0'}
{NAME => '1', BLOOMFILTER => 'NONE', VERSIONS => '3', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_B LOCK_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'false', BLO CKSIZE => '65536', REPLICATION_SCOPE => '0'}


Solution

  • In HFileOutputFormat2.configureIncrementalLoad() http://atetric.com/atetric/javadoc/org.apache.hbase/hbase-server/1.2.4/src-html/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.html#line.408 PutSortReducer is used as a reducer.

    In PutSortReducer.reduce() http://atetric.com/atetric/javadoc/org.apache.hbase/hbase-server/1.2.4/src-html/org/apache/hadoop/hbase/mapreduce/PutSortReducer.html KeyValues are stored in a TreeSet, with a comparator comparing keys only. That is why only one value survives.

    To keep the 2 values, you may create your own reducer, based on PutSortReducer, where you can keep the 2 values. And set it:

    HFileOutputFormat2.configureIncrementalLoad(job,t,rl); job.setReducerClass(MyReducer.class);