Search code examples
hadoophbasehadoop2snappyhfile

Run LoadIncrementalHFiles from Java client


I want to call hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles /user/myuser/map_data/hfiles mytable method from my Java client code.

When I run the application I get the following exception:

org.apache.hadoop.hbase.io.hfile.CorruptHFileException: Problem reading HFile Trailer from file webhdfs://myserver.de:50070/user/myuser/map_data/hfiles/b/b22db8e263b74a7dbd8e36f9ccf16508
    at org.apache.hadoop.hbase.io.hfile.HFile.pickReaderVersion(HFile.java:477)
    at org.apache.hadoop.hbase.io.hfile.HFile.createReader(HFile.java:520)
    at org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles.groupOrSplit(LoadIncrementalHFiles.java:632)
    at org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles$3.call(LoadIncrementalHFiles.java:549)
    at org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles$3.call(LoadIncrementalHFiles.java:546)
    at java.util.concurrent.FutureTask.run(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.RuntimeException: native snappy library not available: this version of libhadoop was built without snappy support.
    at org.apache.hadoop.io.compress.SnappyCodec.checkNativeCodeLoaded(SnappyCodec.java:65)
    at org.apache.hadoop.io.compress.SnappyCodec.getDecompressorType(SnappyCodec.java:193)
    at org.apache.hadoop.io.compress.CodecPool.getDecompressor(CodecPool.java:178)
    at org.apache.hadoop.hbase.io.compress.Compression$Algorithm.getDecompressor(Compression.java:327)
    at org.apache.hadoop.hbase.io.compress.Compression.decompress(Compression.java:422)
    at org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext.prepareDecoding(HFileBlockDefaultDecodingContext.java:90)
    at org.apache.hadoop.hbase.io.hfile.HFileBlock.unpack(HFileBlock.java:529)
    at org.apache.hadoop.hbase.io.hfile.HFileBlock$AbstractFSReader$1.nextBlock(HFileBlock.java:1350)
    at org.apache.hadoop.hbase.io.hfile.HFileBlock$AbstractFSReader$1.nextBlockWithBlockType(HFileBlock.java:1356)
    at org.apache.hadoop.hbase.io.hfile.HFileReaderV2.<init>(HFileReaderV2.java:149)
    at org.apache.hadoop.hbase.io.hfile.HFileReaderV3.<init>(HFileReaderV3.java:77)
    at org.apache.hadoop.hbase.io.hfile.HFile.pickReaderVersion(HFile.java:467)
    ... 8 more

Running the hbase ... command above from console on my Hadoop server works perfectly. But when I try to run these from my Java Code using HBase /Hadoop client libraries it fails with the exception!

Here a code snippet:

public static void main(String[] args) {

    try {
        Configuration conf = loginFromKeyTab("REALM.DE", "server.de", "user", "C:/user.keytab");
        conf.set("fs.webhdfs.impl", org.apache.hadoop.hdfs.web.WebHdfsFileSystem.class.getName());
        conf.set("hbase.zookeeper.quorum", "server1.de,server2.de,server3.de");
        conf.set("zookeeper.znode.parent", "/hbase-secure");
        conf.set("hbase.master.kerberos.principal", "hbase/[email protected]");
        conf.set("hbase.regionserver.kerberos.principal", "hbase/[email protected]");
        conf.set("hbase.security.authentication", "kerberos");

        Connection connection = ConnectionFactory.createConnection(conf);
        Table table = connection.getTable(TableName.valueOf("mytable"));

        RegionLocator locator = connection.getRegionLocator(table.getName());

        Job job = Job.getInstance(conf, "Test Bulk Load"); 

        //HFileOutputFormat2.configureIncrementalLoad(job, table, locator);     
        //Configuration conf2 = job.getConfiguration();

        LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
        loader.doBulkLoad(new Path(HDFS_PATH), connection.getAdmin(), table, locator);
    } catch(Exception e) {
        e.printStackTrace();
    }
}

Do I need to add a dependency to my project? But how / where / which version?

I'm working with HDP 2.5 which contains HBase 1.1.2 and Hadoop 2.7.3


Solution

  • I found another solution for my issue: I'm using a Java program that runs a Process instance that calls the LoadIncrementalHFiles method automatically (running directly on the Hadoop node), instead of using the LoadIncrementalHFiles class itself in my code!

    Here the code snippet of my solution:

    TreeSet<String> subDirs = getHFileDirectories(new Path(HDFS_OUTPUT_PATH), conf);        // The HDFS_OUTPUT_PATH directory contains many HFile sub-directories
    
    for(String hFileDir : subDirs) {
        String pathToReadFrom = HDFS_OUTPUT_PATH + "/" + hFileDir;
    
        String[] execCode = {"hbase", "org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles", "-Dcreate.table=no", pathToReadFrom, "mytable"};       // Important: Separate each parameter here!!!
        ProcessBuilder pb = new ProcessBuilder(execCode);
        pb.redirectErrorStream(true);
        final Process p = pb.start();
    
        new Thread(new Runnable() {
            public void run() {
                BufferedReader input = new BufferedReader(new InputStreamReader(p.getInputStream()));
                String line = null; 
    
                try {
                    while ((line = input.readLine()) != null)
                        System.out.println(line);
                } catch (IOException e) {
                        e.printStackTrace();
                 }
            }
        }).start();
    
        p.waitFor();
    
        int exitCode = p.exitValue();
        System.out.println(" ==> Exit Code: " + exitCode);
    }
    
    System.out.println("Finished");
    

    If somebody has another solution (e.g. how to use the LoadIncrementalHFiles class directly in code), let me know. Thank you!