I have data split into multiple files. I want to load and join the files. I'd like to build a dynamic function that 1. will join n data files into a single data frame 2. given the input of file location and join column (e.g., pk)
I think this can be done with foldLeft, but I am not quite sure how:
Here is my code so far:
@throws
def dataJoin(path:String, fileNames:String*): DataFrame=
{
try
{
val dfList:ArrayBuffer[DataFrame]=new ArrayBuffer
for(fileName <- fileNames)
{
val df:DataFrame=DataFrameUtils.openFile(spark, s"$path${File.separator}$fileName")
dfList += df
}
dfList.foldLeft
{
(df,df1) => joinDataFrames(df,df1, "UID")
}
}
catch
{
case e:Exception => throw new Exception(e)
}
}
def joinDataFrames(df:DataFrame,df1:DataFrame, joinColum:String): Unit =
{
df.join(df1, Seq(joinColum))
}
foldLeft
might indeed be suitable here, but it requires a "zero" element to start the folding from (in addition to the folding function). In this case, that "zero" can be the first DataFrame:
dfList.tail.foldLeft(dfList.head) { (df1, df2) => df1.join(df2, "UID") }
To avoid errors, you probably want to make sure the list isn't empty before trying to access that first item - one way of doing that would be using pattern matching.
dfList match {
case head :: tail => tail.foldLeft(head) { (df1, df2) => df1.join(df2, "UID") }
case Nil => spark.emptyDataFrame
}
Lastly, it's simpler, safer and more idiomatic to map over a collection instead of iterating over it and populated another (empty, mutable) collection:
val dfList = fileNames.map(fileName => DataFrameUtils.openFile(spark, s"$path${File.separator}$fileName"))
Altogether:
def dataJoin(path:String, fileNames: String*): DataFrame = {
val dfList = fileNames
.map(fileName => DataFrameUtils.openFile(spark, s"$path${File.separator}$fileName"))
.toList
dfList match {
case head :: tail => tail.foldLeft(head) { (df1, df2) => df1.join(df2, "UID") }
case Nil => spark.emptyDataFrame
}
}