I have an app that parses vcf files and inserts data into hbase. The app runs when using master local with no issue using apache spark, but when I run it using apache spark yarn cluster, I get a fail with following:
17/03/31 10:36:09 INFO yarn.Client: Application report for application_1490344846293_0020 (state: RUNNING)
17/03/31 10:36:10 INFO yarn.Client: Application report for application_1490344846293_0020 (state: RUNNING)
17/03/31 10:36:11 INFO yarn.Client: Application report for application_1490344846293_0020 (state: RUNNING)
17/03/31 10:36:12 INFO yarn.Client: Application report for application_1490344846293_0020 (state: FINISHED)
17/03/31 10:36:12 INFO yarn.Client:
client token: N/A
diagnostics: User class threw exception: java.lang.RuntimeException: org.apache.hadoop.hbase.client.RetriesExhaustedException: Can't get the locations
ApplicationMaster host: 192.168.0.14
ApplicationMaster RPC port: 0
queue: default
start time: 1490956367991
final status: FAILED
tracking URL: http://master1:8088/proxy/application_1490344846293_0020/
user: ubuntu
Exception in thread "main" org.apache.spark.SparkException: Application application_1490344846293_0020 finished with failed status
at org.apache.spark.deploy.yarn.Client.run(Client.scala:1167)
at org.apache.spark.deploy.yarn.Client$.main(Client.scala:1213)
at org.apache.spark.deploy.yarn.Client.main(Client.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:738)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
17/03/31 10:36:12 INFO util.ShutdownHookManager: Shutdown hook called
17/03/31 10:36:12 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-e6867ef3-fad4-424a-b6d3-f79f48bd65ea
I use the following code to connect to hbase:
package com.mycompany.app;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.spark.api.java.function.VoidFunction;
import java.io.IOException;
class HbaseAppender implements VoidFunction<Put> {
private final String TABLE_NAME = "data";
private final String COLUMN_FAMILY_NAME = "v_data";
static private HTable hTable;
public HbaseAppender(){
init();
}
//method used to establish connection to Hbase
private void init(){
TableName tableName;
Configuration hconf = HBaseConfiguration.create();
hconf.set("hbase.zookeeper.property.clientPort", "2181");
hconf.set("zookeeper.znode.parent", "/hbase-unsecure");
hconf.set("hbase.zookeeper.quorum", "master1");
try {
Connection connection = ConnectionFactory.createConnection();
Admin admin = connection.getAdmin();
tableName = TableName.valueOf(TABLE_NAME);
if(!admin.tableExists(tableName)) {
HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(COLUMN_FAMILY_NAME);
HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName);
hTableDescriptor.addFamily(hColumnDescriptor);
admin.createTable(hTableDescriptor);
}
this.hTable = new HTable(tableName,connection);
}
catch (IOException e){
throw new RuntimeException(e);
}
}
@Override
public void call(Put put) throws Exception {
this.hTable.put(put);
}
}
If we run on a cluster and using one connection, it won't work because connection can't be sent to each node (it is not serializable), and we can't create a connection for each element of an rdd. So the solution is using saveAsNewAPIHadoopDataset, which can create a connection for each node of the cluster and save all elements of the rdd to hbase (or hdfs, depends on the configuration).