For a set of dataframes
val df1 = sc.parallelize(1 to 4).map(i => (i,i*10)).toDF("id","x")
val df2 = sc.parallelize(1 to 4).map(i => (i,i*100)).toDF("id","y")
val df3 = sc.parallelize(1 to 4).map(i => (i,i*1000)).toDF("id","z")
to union all of them I do
df1.unionAll(df2).unionAll(df3)
Is there a more elegant and scalable way of doing this for any number of dataframes, for example from
Seq(df1, df2, df3)
The simplest solution is to reduce
with union
(unionAll
in Spark < 2.0):
val dfs = Seq(df1, df2, df3)
dfs.reduce(_ union _)
This is relatively concise and shouldn't move data from off-heap storage but extends lineage with each union requires non-linear time to perform plan analysis. what can be a problem if you try to merge large number of DataFrames
.
You can also convert to RDDs
and use SparkContext.union
:
dfs match {
case h :: Nil => Some(h)
case h :: _ => Some(h.sqlContext.createDataFrame(
h.sqlContext.sparkContext.union(dfs.map(_.rdd)),
h.schema
))
case Nil => None
}
It keeps lineage short analysis cost low but otherwise it is less efficient than merging DataFrames
directly.