Search code examples
pythonscalaapache-sparkapache-spark-sqlparquet

Spark parse and processing file parquet/json


I have some large data, in parquet format. I'd like to use Spark in Scala to transform this file and save into json format. My input file looks something like this:

a1,   a2,  b,  d,    c1,   c2,    c3,   e (header)
1,   null, 2, "1d", "abc", null, null,  7 (row1)
null, 3,   4, "2c", null,  null, "def", 9 (row2)
... more rows to follow

a1 and a2 are essentially the same column, for each row, only one of them is not null, the same applies for c1, c2 and c3, and I'd like to process my input file with two transformations, the first one is to form the correct a and c column for each row and remove the null ones, so the output will look something like this:

a,  b,   d,   c,    e (header)
1,  2, "1d", "abc", 7 (row1)
3,  4, "2c", "def", 9 (row2)
... more rows to follow

The 2nd transformation that I'd like to do is: based on column d, if the value of column d falls into one set of values ("1d","12","a9","df"), I'd like to add one more value ("1f8") into this row, if not, add the other value ("d8n"), the output will look like this after the second transformation:

a,  b,   d,   c,    e,   f   (header)
1,  2, "1d", "abc", 7, "1f8" (row1)
3,  4, "2c", "def", 9, "d8n" (row2)
... more rows to follow

Note: column f is newly added based on the value of d.


Solution

  • Sample Data

    scala> df.show(false)
    +----+----+---+---+----+----+----+---+
    |a1  |a2  |b  |d  |c1  |c2  |c3  |e  |
    +----+----+---+---+----+----+----+---+
    |1   |null|2  |4  |abc |null|null|7  |
    |null|3   |4  |1  |null|null|def |9  |
    +----+----+---+---+----+----+----+---+
    

    First Transformation, applying nvl function to columns a1,a2,c1,c2,c3 dynamic based length of the column name.

    scala> val columns = (
        df
        .columns
        .filter(_.length > 1)
        .groupBy(c => c(0))
        .map(c => 
            if(c._2.length > 2) 
                s"${c._2.sliding(2).map(c => s"nvl(${c.head},${c.last})").mkString("nvl(",",",")")} as ${c._1}" 
            else 
                s"${c._2.sliding(2).map(c => s"nvl(${c.head},${c.last})").mkString} as ${c._1}"
            ) ++ 
        df.columns.filter(_.length == 1)
    ).toSeq
    
    

    Above code will combine columns like below

    columns: Seq[String] = List(nvl(a1,a2) as a, nvl(nvl(c1,c2),nvl(c2,c3)) as c, b, d, e)
    
    scala> 
        df
        .selectExpr(columns:_*)
        .withColumn(
            "f",
            when(
                $"d".cast("int").isin(Seq(4,2,8,12):_*),
                lit("1f8")
            ).otherwise("d8n")
        ) // Second Transformation
        .show(false)
    

    Final Output

    +---+---+---+---+---+---+
    |a  |c  |b  |d  |e  |f  |
    +---+---+---+---+---+---+
    |1  |abc|2  |4  |7  |1f8|
    |3  |def|4  |1  |9  |d8n|
    +---+---+---+---+---+---+