Search code examples
scalaapache-sparkapache-spark-sqlhbase

Retrieve and format data from HBase to scala Dataframe


I am trying to get data from hbase table into apache spark environment, but I am not able to figure out how to format it. Can somebody help me.

case class systems( rowkey: String, iacp: Option[String], temp: Option[String])

type Record = (String, Option[String], Option[String])

val hBaseRDD_iacp = sc.hbaseTable[Record]("test_table").select("iacp","temp").inColumnFamily("test_fam") 

scala> hBaseRDD_iacp.map(x => systems(x._1,x._2,x._3)).toDF().show()
+--------------+-----------------+--------------------+
|        rowkey|             iacp|                temp|
+--------------+-----------------+--------------------+
|           ab7|0.051,0.052,0.055| 17.326,17.344,17.21|
|           k6c|   0.056,NA,0.054|17.277,17.283,17.256|
|            ad|          NA,23.0|           24.0,23.6|
+--------------+-----------------+--------------------+

However, I actually want it as in the following format. Each comma separated value is in the new row and each NA is replaced by null values. Values in iacp and temp column should be float type. Each row can have varying number of comma separated values.

Thanks in Advance!

+--------------+-----------------+--------------------+
|        rowkey|             iacp|                temp|
+--------------+-----------------+--------------------+
|           ab7|            0.051|              17.326|
|           ab7|            0.052|              17.344|
|           ab7|            0.055|               17.21|
|           k6c|            0.056|              17.277|
|           k6c|             null|              17.283|
|           k6c|            0.054|              17.256|
|            ad|             null|                24.0|
|            ad|               23|                26.0|
+--------------+-----------------+--------------------+

Solution

  • Your hBaseRDD_iacp.map(x => systems(x._1, x._2, x._3)).toDF code line should generate a DataFrame equivalent to the following:

    val df = Seq(
      ("ab7", Some("0.051,0.052,0.055"), Some("17.326,17.344,17.21")),
      ("k6c", Some("0.056,NA,0.054"), Some("17.277,17.283,17.256")),
      ("ad", Some("NA,23.0"), Some("24.0,23.6"))
    ).toDF("rowkey", "iacp", "temp")
    

    To transform the dataset into the wanted result, you can apply a UDF that pairs up elements of the iacp and temp CSV strings to produce an array of (Option[Double], Option[Double]) which is then explode-ed, as shown below:

    import org.apache.spark.sql.functions._
    import spark.implicits._
    
    def pairUpCSV = udf{ (s1: String, s2: String) =>
      import scala.util.Try
      def toNumericArr(csv: String) = csv.split(",").map{
        case s if Try(s.toDouble).isSuccess => Some(s)
        case _ => None
      }
      toNumericArr(s1).zipAll(toNumericArr(s2), None, None)
    }
    
    df.
      withColumn("csv_pairs", pairUpCSV($"iacp", $"temp")).
      withColumn("csv_pair", explode($"csv_pairs")).
      select($"rowkey", $"csv_pair._1".as("iacp"), $"csv_pair._2".as("temp")).
      show(false)
    // +------+-----+------+
    // |rowkey|iacp |temp  |
    // +------+-----+------+
    // |ab7   |0.051|17.326|
    // |ab7   |0.052|17.344|
    // |ab7   |0.055|17.21 |
    // |k6c   |0.056|17.277|
    // |k6c   |null |17.283|
    // |k6c   |0.054|17.256|
    // |ad    |null |24.0  |
    // |ad    |23.0 |23.6  |
    // +------+-----+------+
    

    Note that value NA falls into the default case in method toNumericArr hence isn't singled out as a separate case. Also, zipAll (rather than zip) is used in the UDF to cover cases in which the iacp and temp CSV strings have different element sizes.