I am trying to get data from hbase table into apache spark environment, but I am not able to figure out how to format it. Can somebody help me.
case class systems( rowkey: String, iacp: Option[String], temp: Option[String])
type Record = (String, Option[String], Option[String])
val hBaseRDD_iacp = sc.hbaseTable[Record]("test_table").select("iacp","temp").inColumnFamily("test_fam")
scala> hBaseRDD_iacp.map(x => systems(x._1,x._2,x._3)).toDF().show()
+--------------+-----------------+--------------------+
| rowkey| iacp| temp|
+--------------+-----------------+--------------------+
| ab7|0.051,0.052,0.055| 17.326,17.344,17.21|
| k6c| 0.056,NA,0.054|17.277,17.283,17.256|
| ad| NA,23.0| 24.0,23.6|
+--------------+-----------------+--------------------+
However, I actually want it as in the following format. Each comma separated value is in the new row and each NA is replaced by null values. Values in iacp and temp column should be float type. Each row can have varying number of comma separated values.
Thanks in Advance!
+--------------+-----------------+--------------------+
| rowkey| iacp| temp|
+--------------+-----------------+--------------------+
| ab7| 0.051| 17.326|
| ab7| 0.052| 17.344|
| ab7| 0.055| 17.21|
| k6c| 0.056| 17.277|
| k6c| null| 17.283|
| k6c| 0.054| 17.256|
| ad| null| 24.0|
| ad| 23| 26.0|
+--------------+-----------------+--------------------+
Your hBaseRDD_iacp.map(x => systems(x._1, x._2, x._3)).toDF
code line should generate a DataFrame equivalent to the following:
val df = Seq(
("ab7", Some("0.051,0.052,0.055"), Some("17.326,17.344,17.21")),
("k6c", Some("0.056,NA,0.054"), Some("17.277,17.283,17.256")),
("ad", Some("NA,23.0"), Some("24.0,23.6"))
).toDF("rowkey", "iacp", "temp")
To transform the dataset into the wanted result, you can apply a UDF that pairs up elements of the iacp
and temp
CSV strings to produce an array of (Option[Double], Option[Double])
which is then explode
-ed, as shown below:
import org.apache.spark.sql.functions._
import spark.implicits._
def pairUpCSV = udf{ (s1: String, s2: String) =>
import scala.util.Try
def toNumericArr(csv: String) = csv.split(",").map{
case s if Try(s.toDouble).isSuccess => Some(s)
case _ => None
}
toNumericArr(s1).zipAll(toNumericArr(s2), None, None)
}
df.
withColumn("csv_pairs", pairUpCSV($"iacp", $"temp")).
withColumn("csv_pair", explode($"csv_pairs")).
select($"rowkey", $"csv_pair._1".as("iacp"), $"csv_pair._2".as("temp")).
show(false)
// +------+-----+------+
// |rowkey|iacp |temp |
// +------+-----+------+
// |ab7 |0.051|17.326|
// |ab7 |0.052|17.344|
// |ab7 |0.055|17.21 |
// |k6c |0.056|17.277|
// |k6c |null |17.283|
// |k6c |0.054|17.256|
// |ad |null |24.0 |
// |ad |23.0 |23.6 |
// +------+-----+------+
Note that value NA
falls into the default case in method toNumericArr
hence isn't singled out as a separate case. Also, zipAll
(rather than zip
) is used in the UDF to cover cases in which the iacp
and temp
CSV strings have different element sizes.