Search code examples
scalaexceptioncastingapache-spark-sqlrdd

Converting List of List or RDD to DataFrame in Spark-Scala


So basically what I am trying to achieve is - I have a table with 4 columns (say) and I expose it to a DataFrame - DF1. Now I want to store each of the row of the DF1 to another hive table (basically DF2 which schema as - Column1, Column2, Column3) while the column3 value will be the '-' delimited row of DataFrame DF1.

val df = hiveContext.sql("from hive_table SELECT *")
val writeToHiveDf = df.filter(new Column("id").isNotNull)

var builder : List[(String, String, String)] = Nil
    var finalOne  =  new ListBuffer[List[(String, String, String)]]()
    writeToHiveDf.rdd.collect().foreach {
      row =>
        val item = row.mkString("-@")
        builder = List(List("dummy", "NEVER_NULL_CONSTRAINT", "some alpha")).map{case List(a,b,c) => (a,b,c)}
        finalOne += builder
    }

Now I have the finalOne as a list of lists, which I want to convert to a dataframe directly or via RDD.

var listRDD = sc.parallelize(finalOne) //Converts to RDD - It works. 
val dataFrameForHive : DataFrame = listRDD.toDF("table_name", "constraint_applied", "data") //Doesn't work

Error :

java.lang.ClassCastException: org.apache.spark.sql.types.ArrayType cannot be cast to org.apache.spark.sql.types.StructType
    at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:414)
    at org.apache.spark.sql.SQLImplicits.rddToDataFrameHolder(SQLImplicits.scala:94)

Can some one help me understand the right way to convert this to DataFrame. Thanks a ton in advance for your support.


Solution

  • I believe flattening your 'finalOne' dataframe before passing it onto the sc.parallelize() function should give an outcome in lines with what you are expecting.

    var listRDD = sc.parallelize(finalOne)

    val dataFrameForHive : DataFrame = listRDD.toDF("table_name", "constraint_applied", "data")