Search code examples
scalaapache-sparkhbase

Unable to write data to Hbase from Spark Scala Dataframe


I am unable to write to Hbase using the example mentioned in readme. Here is a simple code that illustrates my approach and the error I encounter

import org.apache.spark.sql.execution.datasources.hbase._

val input = Seq(
| ("a:1", "null", "null", "3", "4", "5", "6"),
| ("b:1", "2", "3", "null", "null", "5", "6")
)

val df = input.toDF

val TIMELINE_TABLE = "test_timeline"

val timelineCatalog =
  s"""
     "table":{"namespace":"default", "name":""".stripMargin+ TIMELINE_TABLE +"""", "tableCoder":"PrimitiveType"},
                                      |"rowkey":"key",
                                      |"columns":{
                                      |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
                                      |"col1":{"cf":"main", "col":"kx", "type":"string"},
                                      |"col2":{"cf":"main", "col":"ky", "type":"string"},
                                      |"col3":{"cf":"main", "col":"rx", "type":"string"},
                                      |"col4":{"cf":"main", "col":"ry", "type":string"},
                                      |"col5":{"cf":"main", "col":"wx", "type":"string"},
                                      |"col6":{"cf":"main", "col":"wy", "type":"string"}
                                      |}
                                      |}""".stripMargin

val HBASE_CONF_FILE = "/etc/hbase/conf/hbase-site.xml"
df.write.options(Map(HBaseTableCatalog.tableCatalog -> timelineCatalog)).format("org.apache.spark.sql.execution.datasources.hbase").save()

java.lang.ClassCastException: org.json4s.JsonAST$JString cannot be cast to org.json4s.JsonAST$JObject
  at org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog$.apply(HBaseTableCatalog.scala:257)
  at org.apache.spark.sql.execution.datasources.hbase.HBaseRelation.<init>(HBaseRelation.scala:77)
  at org.apache.spark.sql.execution.datasources.hbase.DefaultSource.createRelation(HBaseRelation.scala:59)
  at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:518)
  at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215)
  ... 50 elided

My Scala version is 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_131), and I'm on Spark 2.1.0.2.6.0.10-29 and hbase 1.1.2 I am using hortonworks-spark/shc connector and I am unable to find anything wrong with my data. It is actually a clean version of it. In ideal scenario I would want the string "null" to be actual null. But I couldn't get that to work, so I thought creating strings would at least get it working. Any help would be highly appreciated. I have also raised an issue on Github as well https://github.com/hortonworks-spark/shc/issues/172


Solution

  • Although I haven't really found out a way using hortonworks/shc here is an alternative approach for doing the same

    import org.apache.hadoop.hbase.client.Put
    import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable
    import org.apache.hadoop.hbase.util.Bytes
    import org.apache.hadoop.mapreduce.Job
    import org.apache.hadoop.hbase.HBaseConfiguration
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.sql.{SparkSession}
    
    val sqlContext = sparkSession.sqlContext
    import sqlContext.implicits._
    
    val input = Seq(
      ("a:1", "null", "null"),
      ("b:1", "2", "3")
    )
    
    val df = input.toDF("col0", "col1", "col2")
    
    val TIMELINE_TABLE = "prod_timeline"
    
    
    val config = HBaseConfiguration.create()
    config.clear()
    
    config.set("hbase.zookeeper.quorum", "zk0.example.net");
    config.set("zookeeper.znode.parent", "/hbase")
    config.set("hbase.zookeeper.property.clientPort", "2181")
    config.set(TableOutputFormat.OUTPUT_TABLE, TIMELINE_TABLE)
    
    val rdd = df.rdd.map(x => {
      val rowkey = x.get(0).toString
      var p = new Put(rowkey.getBytes())
      p.addColumn("main".toCharArray.map(_.toByte), "a".toCharArray.map(_.toByte), x.get(1).toString.getBytes())
      p.addColumn("main".toCharArray.map(_.toByte), "b".toCharArray.map(_.toByte), x.get(2).toString.getBytes())
    
      (new ImmutableBytesWritable(Bytes.toBytes(rowkey)), p)
    })
    
    val newAPIJobConfiguration = Job.getInstance(config)
    newAPIJobConfiguration.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, TIMELINE_TABLE)
    
    newAPIJobConfiguration.setOutputFormatClass(classOf[org.apache.hadoop.hbase.mapreduce.TableOutputFormat[String]])
    
    rdd.saveAsNewAPIHadoopDataset(newAPIJobConfiguration.getConfiguration());