Search code examples
scalaapache-sparkfoldleft

Scala: dynamically joining data frames


I have data split into multiple files. I want to load and join the files. I'd like to build a dynamic function that 1. will join n data files into a single data frame 2. given the input of file location and join column (e.g., pk)

I think this can be done with foldLeft, but I am not quite sure how:

Here is my code so far:

@throws
def dataJoin(path:String, fileNames:String*): DataFrame=
{
  try
  {
    val dfList:ArrayBuffer[DataFrame]=new ArrayBuffer
    for(fileName <- fileNames)
    {
      val df:DataFrame=DataFrameUtils.openFile(spark, s"$path${File.separator}$fileName")
      dfList += df
    }

    dfList.foldLeft
    {
             (df,df1) => joinDataFrames(df,df1, "UID")
    }
  }
  catch
  {
    case e:Exception => throw new Exception(e)
  }
}


def joinDataFrames(df:DataFrame,df1:DataFrame, joinColum:String): Unit =
{
  df.join(df1, Seq(joinColum))
}

Solution

  • foldLeft might indeed be suitable here, but it requires a "zero" element to start the folding from (in addition to the folding function). In this case, that "zero" can be the first DataFrame:

    dfList.tail.foldLeft(dfList.head) { (df1, df2) => df1.join(df2, "UID") }
    

    To avoid errors, you probably want to make sure the list isn't empty before trying to access that first item - one way of doing that would be using pattern matching.

    dfList match {
      case head :: tail => tail.foldLeft(head) { (df1, df2) => df1.join(df2, "UID") }
      case Nil => spark.emptyDataFrame
    }
    

    Lastly, it's simpler, safer and more idiomatic to map over a collection instead of iterating over it and populated another (empty, mutable) collection:

    val dfList = fileNames.map(fileName => DataFrameUtils.openFile(spark, s"$path${File.separator}$fileName"))
    

    Altogether:

    def dataJoin(path:String, fileNames: String*): DataFrame = {
      val dfList = fileNames
        .map(fileName => DataFrameUtils.openFile(spark, s"$path${File.separator}$fileName"))
        .toList
    
      dfList match {
        case head :: tail => tail.foldLeft(head) { (df1, df2) => df1.join(df2, "UID") }
        case Nil => spark.emptyDataFrame
      }
    }