Search code examples
hadoopapache-sparkhbasebigdata

How to read a record from HBase then store into Spark RDD (Resilient Distributed Datasets); and read one RDD record then write into HBase?


So I want to write a code to read a record from Hadoop HBase then store it into Spark RDD (Resilient Distributed Datasets); and read one RDD record then write into HBase. I have ZERO knowledge about either of the two and I need to use AWS cloud or Hadoop virtual machine. Someone please guide me to start from scratch.


Solution

  • Please make use of the basic code in Scala where we are reading the data in HBase using Scala. Similarly you can write a table creation to write the data into HBase

    import org.apache.hadoop.hbase.client.{HBaseAdmin, Result}
    import org.apache.hadoop.hbase.{ HBaseConfiguration, HTableDescriptor }
    import org.apache.hadoop.hbase.mapreduce.TableInputFormat
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable
    
    import org.apache.spark._
    
    object HBaseApp {
      def main(args: Array[String]) {
        val sparkConf = new SparkConf().setAppName("HBaseApp").setMaster("local[2]")
        val sc = new SparkContext(sparkConf)
        val conf = HBaseConfiguration.create()
        val tableName = "table1"
    
        System.setProperty("user.name", "hdfs")
        System.setProperty("HADOOP_USER_NAME", "hdfs")
        conf.set("hbase.master", "localhost:60000")
        conf.setInt("timeout", 100000)
        conf.set("hbase.zookeeper.quorum", "localhost")
        conf.set("zookeeper.znode.parent", "/hbase-unsecure")
        conf.set(TableInputFormat.INPUT_TABLE, tableName)
    
        val admin = new HBaseAdmin(conf)
        if (!admin.isTableAvailable(tableName)) {
          val tableDesc = new HTableDescriptor(tableName)
          admin.createTable(tableDesc)
        }
    
        val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
        println("Number of Records found : " + hBaseRDD.count())
        sc.stop()
      }
    }