I have a dataframe of the following structure:
A: Array[String] | B: Array[String] | [ ... multiple other columns ...]
=========================================================================
[A, B, C, D] | [1, 2, 3, 4] | [ ... array with 4 elements ... ]
[E, F, G, H, I] | [5, 6, 7, 8, 9] | [ ... array with 5 elements ... ]
[J] | [10] | [ ... array with 1 element ... ]
I want to write a UDF, that
The resulting column should look like this:
ZippedAndExploded: Array[String]
=================================
[A, 1, ...]
[B, 2, ...]
[C, 3, ...]
[D, 4, ...]
[E, 5, ...]
[F, 6, ...]
[G, 7, ...]
[H, 8, ...]
[I, 9, ...]
[J, 10, ...]
At the moment I'm using a multi-call (one per column name, list of column names is collected before during runtime) to a UDF like this:
val myudf6 = udf((xa:Seq[Seq[String]],xb:Seq[String]) => {
xa.indices.map(i => {
xa(i) :+ xb(i) // Add one element to the zip column
})
})
val allColumnNames = df.columns.filter(...)
for (columnName <- allColumnNames) {
df = df.withColumn("zipped", myudf8(df("zipped"), df(columnName))
}
df = df.explode("zipped")
Since the dataframe can have hundreds of columns, this iterative call of withColumn
seems to take a long time.
Question(s): Is this possible to do with one UDF and a single DF.withColumn(...)
call?
Important: The UDF should zip a dynamic number of columns (read at runtime).
Use an UDF
that takes a variable number of columns as input. This can be done with an array of arrays (assuming that the types are the same). Since you have an array of arrays it's possible to use transpose
which will acheive the same results as zipping the lists together. The resulting array can then be exploded.
val array_zip_udf = udf((cols: Seq[Seq[String]]) => {
cols.transpose
})
val allColumnNames = df.columns.filter(...).map(col)
val df2 = df.withColumn("exploded", explode(array_zip_udf(array(allColumnNames: _*))))
Note that in Spark 2.4+ it would be possible to use arrays_zip
instead of an UDF
:
val df2 = df.withColumn("exploded", explode(arrays_zip(allColumnNames: _*)))