I'm running a spark job to generate HFiles
for my HBase
data store.
It used to be working fine with my Cloudera
cluster, but when we switched to EMR
cluster, it fails with following stacktrace:
Serialization stack:
- object not serializable (class: org.apache.hadoop.hbase.io.ImmutableBytesWritable, value: 50 31 36 31 32 37 30 33 34 5f 49 36 35 38 34 31 35 38 35); not retrying
Serialization stack:
- object not serializable (class: org.apache.hadoop.hbase.io.ImmutableBytesWritable, value: 50 31 36 31 32 37 30 33 34 5f 49 36 35 38 34 31 35 38 35)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1505)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1493)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1492)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1492)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:803)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:803)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:803)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1720)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1675)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1664)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:629)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1925)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1938)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1958)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1158)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1085)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopFile$2.apply$mcV$sp(PairRDDFunctions.scala:1005)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopFile$2.apply(PairRDDFunctions.scala:996)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopFile$2.apply(PairRDDFunctions.scala:996)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopFile(PairRDDFunctions.scala:996)
at org.apache.spark.api.java.JavaPairRDD.saveAsNewAPIHadoopFile(JavaPairRDD.scala:823)
My questions:
spark-submit --conf spark.kryo.classesToRegister=org.apache.hadoop.hbase.io.ImmutableBytesWritable,org.apache.hadoop.hbase.KeyValue --master yarn --deploy-mode client --driver-memory 16G --executor-memory 18G ...
but still, I got the same error.Here's my Java code:
protected void generateHFilesUsingSpark(JavaRDD<Row> rdd) throws Exception {
JavaPairRDD<ImmutableBytesWritable, KeyValue> javaPairRdd = rdd.mapToPair(
new PairFunction<Row, ImmutableBytesWritable, KeyValue>() {
public Tuple2<ImmutableBytesWritable, KeyValue> call(Row row) throws Exception {
String key = (String) row.get(0);
String value = (String) row.get(1);
ImmutableBytesWritable rowKey = new ImmutableBytesWritable();
byte[] rowKeyBytes = Bytes.toBytes(key);
rowKey.set(rowKeyBytes);
KeyValue keyValue = new KeyValue(rowKeyBytes,
Bytes.toBytes("COL"),
Bytes.toBytes("FM"),
ProductJoin.newBuilder()
.setId(key)
.setSolrJson(value)
.build().toByteArray());
return new Tuple2<ImmutableBytesWritable, KeyValue>(rowKey, keyValue);
}
});
Configuration baseConf = HBaseConfiguration.create();
Configuration conf = new Configuration();
conf.set(HBASE_ZOOKEEPER_QUORUM, "xxx.xxx.xx.xx");
Job job = new Job(baseConf, "APP-NAME");
HTable table = new HTable(conf, "hbaseTargetTable");
Partitioner partitioner = new IntPartitioner(importerParams.shards);
JavaPairRDD<ImmutableBytesWritable, KeyValue> repartitionedRdd =
javaPairRdd.repartitionAndSortWithinPartitions(partitioner);
HFileOutputFormat2.configureIncrementalLoad(job, table);
System.out.println("Done configuring incremental load....");
Configuration config = job.getConfiguration();
repartitionedRdd.saveAsNewAPIHadoopFile(
"hfilesOutputPath",
ImmutableBytesWritable.class,
KeyValue.class,
HFileOutputFormat2.class,
config
);
System.out.println("Saved to HFiles to: " + importerParams.hfilesOutputPath);
}
All right, problem solved, the trick is to use KyroSerializer, I added this in my Java code to register ImmutableBytesWritable.
SparkSession.Builder builder = SparkSession.builder().appName("AWESOME");
builder.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
SparkConf conf = new SparkConf().setAppName("AWESOME");
Class<?>[] classes = new Class[]{org.apache.hadoop.hbase.io.ImmutableBytesWritable.class};
conf.registerKryoClasses(classes);
builder.config(conf);
SparkSession spark = builder.getOrCreate();