Search code examples
scalaapache-sparkapache-spark-sql

Spark unionAll multiple dataframes


For a set of dataframes

val df1 = sc.parallelize(1 to 4).map(i => (i,i*10)).toDF("id","x")
val df2 = sc.parallelize(1 to 4).map(i => (i,i*100)).toDF("id","y")
val df3 = sc.parallelize(1 to 4).map(i => (i,i*1000)).toDF("id","z")

to union all of them I do

df1.unionAll(df2).unionAll(df3)

Is there a more elegant and scalable way of doing this for any number of dataframes, for example from

Seq(df1, df2, df3) 

Solution

  • The simplest solution is to reduce with union (unionAll in Spark < 2.0):

    val dfs = Seq(df1, df2, df3)
    dfs.reduce(_ union _)
    

    This is relatively concise and shouldn't move data from off-heap storage but extends lineage with each union requires non-linear time to perform plan analysis. what can be a problem if you try to merge large number of DataFrames.

    You can also convert to RDDs and use SparkContext.union:

    dfs match {
      case h :: Nil => Some(h)
      case h :: _   => Some(h.sqlContext.createDataFrame(
                         h.sqlContext.sparkContext.union(dfs.map(_.rdd)),
                         h.schema
                       ))
      case Nil  => None
    }
    

    It keeps lineage short analysis cost low but otherwise it is less efficient than merging DataFrames directly.