I have some large data, in parquet format. I'd like to use Spark in Scala to transform this file and save into json format. My input file looks something like this:
a1, a2, b, d, c1, c2, c3, e (header)
1, null, 2, "1d", "abc", null, null, 7 (row1)
null, 3, 4, "2c", null, null, "def", 9 (row2)
... more rows to follow
a1
and a2
are essentially the same column, for each row, only one of them is not null, the same applies for c1
, c2
and c3
, and I'd like to process my input file with two transformations, the first one is to form the correct a
and c
column for each row and remove the null ones, so the output will look something like this:
a, b, d, c, e (header)
1, 2, "1d", "abc", 7 (row1)
3, 4, "2c", "def", 9 (row2)
... more rows to follow
The 2nd transformation that I'd like to do is: based on column d
, if the value of column d
falls into one set of values ("1d","12","a9","df"), I'd like to add one more value ("1f8") into this row, if not, add the other value ("d8n"), the output will look like this after the second transformation:
a, b, d, c, e, f (header)
1, 2, "1d", "abc", 7, "1f8" (row1)
3, 4, "2c", "def", 9, "d8n" (row2)
... more rows to follow
Note: column f
is newly added based on the value of d
.
Sample Data
scala> df.show(false)
+----+----+---+---+----+----+----+---+
|a1 |a2 |b |d |c1 |c2 |c3 |e |
+----+----+---+---+----+----+----+---+
|1 |null|2 |4 |abc |null|null|7 |
|null|3 |4 |1 |null|null|def |9 |
+----+----+---+---+----+----+----+---+
First Transformation, applying nvl
function to columns a1
,a2
,c1
,c2
,c3
dynamic based length of the column name.
scala> val columns = (
df
.columns
.filter(_.length > 1)
.groupBy(c => c(0))
.map(c =>
if(c._2.length > 2)
s"${c._2.sliding(2).map(c => s"nvl(${c.head},${c.last})").mkString("nvl(",",",")")} as ${c._1}"
else
s"${c._2.sliding(2).map(c => s"nvl(${c.head},${c.last})").mkString} as ${c._1}"
) ++
df.columns.filter(_.length == 1)
).toSeq
Above code will combine columns like below
columns: Seq[String] = List(nvl(a1,a2) as a, nvl(nvl(c1,c2),nvl(c2,c3)) as c, b, d, e)
scala>
df
.selectExpr(columns:_*)
.withColumn(
"f",
when(
$"d".cast("int").isin(Seq(4,2,8,12):_*),
lit("1f8")
).otherwise("d8n")
) // Second Transformation
.show(false)
Final Output
+---+---+---+---+---+---+
|a |c |b |d |e |f |
+---+---+---+---+---+---+
|1 |abc|2 |4 |7 |1f8|
|3 |def|4 |1 |9 |d8n|
+---+---+---+---+---+---+