Search code examples
javahadoopapache-sparkhbasehadoop-yarn

Spark Yarn Cluster connection to Hbase error


I have an app that parses vcf files and inserts data into hbase. The app runs when using master local with no issue using apache spark, but when I run it using apache spark yarn cluster, I get a fail with following:

17/03/31 10:36:09 INFO yarn.Client: Application report for application_1490344846293_0020 (state: RUNNING)
17/03/31 10:36:10 INFO yarn.Client: Application report for application_1490344846293_0020 (state: RUNNING)
17/03/31 10:36:11 INFO yarn.Client: Application report for application_1490344846293_0020 (state: RUNNING)
17/03/31 10:36:12 INFO yarn.Client: Application report for application_1490344846293_0020 (state: FINISHED)
17/03/31 10:36:12 INFO yarn.Client: 
     client token: N/A
     diagnostics: User class threw exception: java.lang.RuntimeException: org.apache.hadoop.hbase.client.RetriesExhaustedException: Can't get the locations
     ApplicationMaster host: 192.168.0.14
     ApplicationMaster RPC port: 0
     queue: default
     start time: 1490956367991
     final status: FAILED
     tracking URL: http://master1:8088/proxy/application_1490344846293_0020/
     user: ubuntu
Exception in thread "main" org.apache.spark.SparkException: Application application_1490344846293_0020 finished with failed status
    at org.apache.spark.deploy.yarn.Client.run(Client.scala:1167)
    at org.apache.spark.deploy.yarn.Client$.main(Client.scala:1213)
    at org.apache.spark.deploy.yarn.Client.main(Client.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:738)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
17/03/31 10:36:12 INFO util.ShutdownHookManager: Shutdown hook called
17/03/31 10:36:12 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-e6867ef3-fad4-424a-b6d3-f79f48bd65ea

I use the following code to connect to hbase:

    package com.mycompany.app;
    import org.apache.hadoop.hbase.HColumnDescriptor;
    import org.apache.hadoop.hbase.HTableDescriptor;
    import org.apache.hadoop.hbase.TableName;
    import org.apache.hadoop.hbase.client.*;
    import org.apache.spark.api.java.function.VoidFunction;

    import java.io.IOException;

    class HbaseAppender implements VoidFunction<Put> {


        private final String TABLE_NAME = "data";
        private final String COLUMN_FAMILY_NAME = "v_data";


        static private HTable hTable;


        public HbaseAppender(){
            init();
        }

        //method used to establish connection to Hbase
        private void init(){
        TableName tableName;
        Configuration hconf = HBaseConfiguration.create();
        hconf.set("hbase.zookeeper.property.clientPort", "2181");
        hconf.set("zookeeper.znode.parent", "/hbase-unsecure");
        hconf.set("hbase.zookeeper.quorum", "master1");

            try {
                Connection connection = ConnectionFactory.createConnection();

                Admin admin = connection.getAdmin();
                tableName = TableName.valueOf(TABLE_NAME);
                if(!admin.tableExists(tableName)) {
                    HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(COLUMN_FAMILY_NAME);
                    HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName);
                    hTableDescriptor.addFamily(hColumnDescriptor);
                    admin.createTable(hTableDescriptor);
                }
                this.hTable = new HTable(tableName,connection);

           }
            catch (IOException e){
                throw new RuntimeException(e);
            }
        }

        @Override
        public void call(Put put) throws Exception {
            this.hTable.put(put);
        }
    }

Solution

  • If we run on a cluster and using one connection, it won't work because connection can't be sent to each node (it is not serializable), and we can't create a connection for each element of an rdd. So the solution is using saveAsNewAPIHadoopDataset, which can create a connection for each node of the cluster and save all elements of the rdd to hbase (or hdfs, depends on the configuration).