I am unable to write to Hbase using the example mentioned in readme. Here is a simple code that illustrates my approach and the error I encounter
import org.apache.spark.sql.execution.datasources.hbase._
val input = Seq(
| ("a:1", "null", "null", "3", "4", "5", "6"),
| ("b:1", "2", "3", "null", "null", "5", "6")
)
val df = input.toDF
val TIMELINE_TABLE = "test_timeline"
val timelineCatalog =
s"""
"table":{"namespace":"default", "name":""".stripMargin+ TIMELINE_TABLE +"""", "tableCoder":"PrimitiveType"},
|"rowkey":"key",
|"columns":{
|"col0":{"cf":"rowkey", "col":"key", "type":"string"},
|"col1":{"cf":"main", "col":"kx", "type":"string"},
|"col2":{"cf":"main", "col":"ky", "type":"string"},
|"col3":{"cf":"main", "col":"rx", "type":"string"},
|"col4":{"cf":"main", "col":"ry", "type":string"},
|"col5":{"cf":"main", "col":"wx", "type":"string"},
|"col6":{"cf":"main", "col":"wy", "type":"string"}
|}
|}""".stripMargin
val HBASE_CONF_FILE = "/etc/hbase/conf/hbase-site.xml"
df.write.options(Map(HBaseTableCatalog.tableCatalog -> timelineCatalog)).format("org.apache.spark.sql.execution.datasources.hbase").save()
java.lang.ClassCastException: org.json4s.JsonAST$JString cannot be cast to org.json4s.JsonAST$JObject
at org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog$.apply(HBaseTableCatalog.scala:257)
at org.apache.spark.sql.execution.datasources.hbase.HBaseRelation.<init>(HBaseRelation.scala:77)
at org.apache.spark.sql.execution.datasources.hbase.DefaultSource.createRelation(HBaseRelation.scala:59)
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:518)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215)
... 50 elided
My Scala version is 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_131)
, and I'm on Spark 2.1.0.2.6.0.10-29
and hbase 1.1.2
I am using hortonworks-spark/shc connector and I am unable to find anything wrong with my data. It is actually a clean version of it. In ideal scenario I would want the string "null"
to be actual null
. But I couldn't get that to work, so I thought creating strings would at least get it working. Any help would be highly appreciated. I have also raised an issue on Github as well https://github.com/hortonworks-spark/shc/issues/172
Although I haven't really found out a way using hortonworks/shc
here is an alternative approach for doing the same
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{SparkSession}
val sqlContext = sparkSession.sqlContext
import sqlContext.implicits._
val input = Seq(
("a:1", "null", "null"),
("b:1", "2", "3")
)
val df = input.toDF("col0", "col1", "col2")
val TIMELINE_TABLE = "prod_timeline"
val config = HBaseConfiguration.create()
config.clear()
config.set("hbase.zookeeper.quorum", "zk0.example.net");
config.set("zookeeper.znode.parent", "/hbase")
config.set("hbase.zookeeper.property.clientPort", "2181")
config.set(TableOutputFormat.OUTPUT_TABLE, TIMELINE_TABLE)
val rdd = df.rdd.map(x => {
val rowkey = x.get(0).toString
var p = new Put(rowkey.getBytes())
p.addColumn("main".toCharArray.map(_.toByte), "a".toCharArray.map(_.toByte), x.get(1).toString.getBytes())
p.addColumn("main".toCharArray.map(_.toByte), "b".toCharArray.map(_.toByte), x.get(2).toString.getBytes())
(new ImmutableBytesWritable(Bytes.toBytes(rowkey)), p)
})
val newAPIJobConfiguration = Job.getInstance(config)
newAPIJobConfiguration.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, TIMELINE_TABLE)
newAPIJobConfiguration.setOutputFormatClass(classOf[org.apache.hadoop.hbase.mapreduce.TableOutputFormat[String]])
rdd.saveAsNewAPIHadoopDataset(newAPIJobConfiguration.getConfiguration());