Search code examples
scalaapache-sparkdataframehivehbase

NullPointerException when converting mapped data to DataFrame


I am writing a spark application which takes transaction data from Hive and join it with location data from HBase table. Basically the end goal is to be able to tell where the transaction happened by joining the lat and long from the HBase table to the transaction data from Hive. However, I keep getting NullPointerException when I convert the joined data set to DataFrame.

The exception appears when I use the following:
.toDF()
.createDataFrame()
.parallize(.toSeq)

At first I thought that some columns have a null value so I use Option().toString to ensure that there are no nulls but the error still keep appearing when I call the above 3 methods.

I also can confirm that placeholder_Iterator.toStream is not null as I manage to print out the data.

I have to use foreachPartition because getATMLocation() connects to a hbase table to get the lat and log. Serialization error will occur if i don't use foreachPartition. Below is the code for the function:

def getATMLocation(colFamily: String, search_item: String, table: Table) = {
    val scanner = new Scan()
    scanner
      .addColumn(colFamily.getBytes(), atm_dict_key.getBytes())
      .addColumn(colFamily.getBytes(), atm_dict_lat.getBytes())
      .addColumn(colFamily.getBytes(), atm_dict_long.getBytes())

    val filter = new SingleColumnValueFilter(colFamily.getBytes, atm_dict_key.getBytes(), CompareOp.EQUAL, Option(search_item).getOrElse("").toString.getBytes())
    scanner.setFilter(filter)

    val atm_locations = table.getScanner(scanner)

    val location = atm_locations.next()

    val longitude = location match {
      case null => null
      case _ => Option(Bytes.toString(location.getValue(colFamily.getBytes(), atm_dict_long.getBytes()))).getOrElse("")
    }

    val latitude = location match {
      case null => null
      case _ => Option(Bytes.toString(location.getValue(colFamily.getBytes(), atm_dict_lat.getBytes()))).getOrElse("")
    }

    atm_locations.close()

    (longitude, latitude)
  }

Below is the troubled code for your reference:

val max_records = sql(hive_query_1 + " " + period_clause.replace("|date|", "01-11-2018")).select("transac_count").as[String].collect()(0).toInt
      val max_page = math.ceil(max_records.toDouble/page_limit.toDouble).toInt

      val start_row = 0
      val end_row = page_limit.toInt

      if(max_records > 0) {
        for (page <- 0 to max_page - 1) {

          val hiveDF = sql("SELECT " + hive_columns + " FROM (" + (hive_query_2 + " " + period_clause.replace("|date|", "01-11-2018")
            ) + ") as trans_data WHERE rowid BETWEEN " + (start_row + (page * page_limit.toInt)).toString + " AND " + ((end_row + (page * page_limit.toInt)) - 1).toString)
            .withColumn("uuid", timeUUID())
            .withColumn("created_dt", current_timestamp())

          hiveDF.show()

          hiveDF.rdd.foreachPartition{ iter =>
            val hbaseconfig = HBaseConfiguration.create()
            hbaseconfig.set("keytab.file", keytab)
            val hbase_connection = ConnectionFactory.createConnection(hbaseconfig)
            val table = hbase_connection.getTable(TableName.valueOf(hbase_table))
            val regionLoc = hbase_connection.getRegionLocator(table.getName)
            val admin = hbase_connection.getAdmin

            val atm_dict_table = hbase_connection.getTable(TableName.valueOf(atm_dict_tbl))

            val placeholder_Iterator = iter.map(r => {
              val location = Query.getATMLocation(atm_dict_col_family, Option(r.get(14)).getOrElse("").toString, atm_dict_table)
              (Option(r.get(0)).toString, Option(r.get(1)).toString, Option(r.get(2)).toString, Option(r.get(3)).toString, Option(r.get(4)).toString, Option(r.get(5)).toString, Option(r.get(6)).toString, Option(r.get(7)).toString, Option(r.get(8)).toString, Option(r.get(9)).toString, Option(r.get(10)).toString, Option(r.get(11)).toString, Option(r.get(12)).toString, Option(r.get(13)).toString, Option(r.get(14)).toString, Option(r.get(15)).toString,  Option(r.get(16)).toString , Option(location._1).toString, Option(location._2).toString)
            })

            val test = placeholder_Iterator.toStream.toDF(new_column_names: _*)
            test.foreach(x => println(x))
          }
        }
      }

Below is the error that is returned:

java.lang.NullPointerException
    at org.apache.spark.sql.SQLImplicits.localSeqToDatasetHolder(SQLImplicits.scala:228)
    at TransactionData$$anonfun$main$2$$anonfun$apply$1$$anonfun$apply$mcVI$sp$1.apply(TransactionData.scala:109)
    at TransactionData$$anonfun$main$2$$anonfun$apply$1$$anonfun$apply$mcVI$sp$1.apply(TransactionData.scala:94)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:935)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:935)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:381)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

I really hope the joined data can be converted to a DataFrame so that I can write it to a HFile and bulk insert it to HBase


Solution

  • I have found the answer. Reason for the null pointer exception is because dataframes, rdd or datasets can only exist on the driver. This post explains it.

    Spark : how can i create local dataframe in each executor