Search code examples
apache-sparkapache-spark-sqluser-defined-functionsapache-spark-dataset

Zip and Explode multiple Columns in Spark SQL Dataframe


I have a dataframe of the following structure:

A: Array[String]   | B: Array[String] | [ ... multiple other columns ...]
=========================================================================
[A, B, C, D]       | [1, 2, 3, 4]     | [ ... array with 4 elements ... ]
[E, F, G, H, I]    | [5, 6, 7, 8, 9]  | [ ... array with 5 elements ... ]
[J]                | [10]             | [ ... array with 1 element ...  ]

I want to write a UDF, that

  1. Zips the elements on i'th position on each column in the DF
  2. Explodes the DF on each of these zipped tuples

The resulting column should look like this:

ZippedAndExploded: Array[String]
=================================
[A, 1, ...]
[B, 2, ...]
[C, 3, ...]
[D, 4, ...]
[E, 5, ...]
[F, 6, ...]
[G, 7, ...]
[H, 8, ...]
[I, 9, ...]
[J, 10, ...]

At the moment I'm using a multi-call (one per column name, list of column names is collected before during runtime) to a UDF like this:

val myudf6 = udf((xa:Seq[Seq[String]],xb:Seq[String]) => {
  xa.indices.map(i => {
    xa(i) :+ xb(i) // Add one element to the zip column
  })
})

val allColumnNames = df.columns.filter(...)    

for (columnName <- allColumnNames) {
  df = df.withColumn("zipped", myudf8(df("zipped"), df(columnName))
}
df = df.explode("zipped")

Since the dataframe can have hundreds of columns, this iterative call of withColumn seems to take a long time.

Question(s): Is this possible to do with one UDF and a single DF.withColumn(...) call?

Important: The UDF should zip a dynamic number of columns (read at runtime).


Solution

  • Use an UDF that takes a variable number of columns as input. This can be done with an array of arrays (assuming that the types are the same). Since you have an array of arrays it's possible to use transpose which will acheive the same results as zipping the lists together. The resulting array can then be exploded.

    val array_zip_udf = udf((cols: Seq[Seq[String]]) => {
      cols.transpose
    })
    
    val allColumnNames = df.columns.filter(...).map(col)
    val df2 = df.withColumn("exploded", explode(array_zip_udf(array(allColumnNames: _*))))
    

    Note that in Spark 2.4+ it would be possible to use arrays_zip instead of an UDF:

    val df2 = df.withColumn("exploded", explode(arrays_zip(allColumnNames: _*)))