Search code examples
scaladataframeapache-sparkfoldleft

select and withcolumn both are not working with foldleft


Trying to explode given columns from nested schema. I am trying to achieve this with fold-left on the dataframe.

Here I have handled only two scenarios

  1. If column type is struct then I am trying to get by select clause
  2. If column type is array then I am trying to explode data by using withColumn then select clause

Here is the my schema :

import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

val schema = StructType(Array(
    StructField("RootData", StructType(Seq(
        StructField("Rates",ArrayType(StructType(Array(
            StructField("Code",StringType,true), 
            StructField("Rate",StringType,true), 
            StructField("Type",StringType,true), 
            StructField("TargetValue",StringType,true)))), true), 
        StructField("RecordCount",LongType,true))),true), 
    StructField("CreationDate",StringType,true), 
    StructField("SysID",StringType,true), 
    StructField("ImportID",StringType,true)))


|-- RootData: struct (nullable = true)
|    |-- Rates: array (nullable = true)
|    |    |-- element: struct (containsNull = true)
|    |    |    |-- Code: string (nullable = true)
|    |    |    |-- Rate: string (nullable = true)
|    |    |    |-- Type: string (nullable = true)
|    |    |    |-- TargetValue: string (nullable = true)
|    |-- RecordCount: long (nullable = true)
|-- CreationDate: string (nullable = true)
|-- SysID: string (nullable = true)
|-- ImportID: string (nullable = true)

And below is the code snippet :

 // Here sourceDF has nested schema dataframe
 // List of  nested columns 
def execute(sourceDf: DataFrame, exp_Cols : Array[String]) = {
    var list = Array[String]()
    val df = exp_Cols.foldLeft(sourceDf){(df, colName) =>
        if ( df.columns.contains(colName) ) {
            val typeName = df.schema( colName ).dataType.typeName
            println("typeName " + typeName)
            if ( typeName == "struct" || typeName == "array") list = list :+ colName
            if (typeName == "struct") df.selectExpr("*",  colName + ".*")
            else if (typeName == "array") df.withColumn(colName, explode(col(colName))).selectExpr("*",  colName + ".*")
            else df 
        }
        df
    }
    println(list.toList)
    df.drop(list:_*)
}

But When I am trying with below statement its working as expected. same thing I have written with foldleft.

 nestedDf.selectExpr("*", "RootData.*").withColumn("Rates",explode($"Rates")).selectExpr("*","Rates.*").drop("RootData", "Rates")

Did I make any mistake in above method, or can we achieve this in better way.

Am using spark2.30 version and scala 2.11 version

Edit :

Please find the below sample data :

val jsonStr = """{"RootData":{"Rates":[{"Code":"USD","Rate":"2.007500000","Type":"Common","TargetValue":"BYR"},
{"Code":"USD","Rate":"357.300000000","Type":"Common","TargetValue":"MRO"},
{"Code":"USD","Rate":"21005.000000000","Type":"Common","TargetValue":"STD"},
{"Code":"USD","Rate":"248520.960000000","Type":"Common","TargetValue":"VEF"},
{"Code":"USD","Rate":"77.850000000","Type":"Common","TargetValue":"AFN"},
{"Code":"USD","Rate":"475.150000000","Type":"Common","TargetValue":"AMD"},
{"Code":"USD","Rate":"250.000000000","Type":"Common","TargetValue":"YER"},
{"Code":"USD","Rate":"15.063500000","Type":"Common","TargetValue":"ZAR"},
{"Code":"USD","Rate":"13.291500000","Type":"Common","TargetValue":"ZMW"},
{"Code":"USD","Rate":"1.000000000","Type":"Common","TargetValue":"USD"}
],"RecordCount":10}, "CreationDate":"2020-01-01","SysID":"987654321","ImportID":"123456789"}"""

val nestedDf = spark.read.json(Seq(jsonStr).toDS)
val exp_cols = Array("RootData", "Rates")
execute(nestedDf, exp_cols)

Temporary solution I am using as below :

def forStructTypeCol(df : DataFrame, colName: String) = df.selectExpr("*", colName +".*")
def forArrayTypeCol(df : DataFrame, colName: String) = df.withColumn(colName, explode(col(colName))).selectExpr("*", colName +".*")
var t_nestedDf = nestedDf
exp_cols.foreach(colName=> { t_nestedDf =  if ( t_nestedDf.columns.contains(colName) ) { val typeName = t_nestedDf.schema( colName ).dataType.typeName ; if ( typeName == "struct") forStructTypeCol(t_nestedDf, colName) else if (typeName == "array") forArrayTypeCol(t_nestedDf, colName) else t_nestedDf } else t_nestedDf  })
val finaldf = t_nestedDf.drop(exp_cols:_*)

Solution

  • I think your code is wrong because you always return df and not df enriched with the additional columns (maybe you are missing the else clause?):

    def execute(sourceDf: DataFrame, exp_Cols : Array[String]) = {
        var list = Array[String]()
        val df = exp_Cols.foldLeft(sourceDf){(df, colName) =>
            if ( df.columns.contains(colName) ) {
                val typeName = df.schema( colName ).dataType.typeName
                println("typeName " + typeName)
                if ( typeName == "struct" || typeName == "array") list = list :+ colName
                if (typeName == "struct") df.selectExpr("*",  colName + ".*")
                else if (typeName == "array") df.withColumn(colName, explode(col(colName))).selectExpr("*",  colName + ".*")
                else df 
            } else {
                df
            }
        }
        println(list.toList)
        df.drop(list:_*)
    }