Search code examples
scaladataframeapache-sparkrow

merge rows in a dataframe by id trying to avoid null values in columns (Spark scala)


I am developing in Spark scala, and I would like to merge some rows in a dataframe...

My dataframe is the next one:

+-------------------------+-------------------+---------------+------------------------------+
|name                     |col1               |col2           |col3                          |
+-------------------------+-------------------+---------------+------------------------------+
|                    a    |               null|           null|                      0.000000|
|                    a    |           0.000000|           null|                          null|
|                    b    |               null|           null|                      0.000000|
|                    b    |         300.000000|           null|                          null|
+-------------------------+-------------------+---------------+------------------------------+

And I want to turn on the next dataframe:

+-------------------------+-------------------+---------------+------------------------------+
|name                     |col1               |col2           |col3                          |
+-------------------------+-------------------+---------------+------------------------------+
|                    a    |           0.000000|           null|                      0.000000|
|                    b    |         300.000000|           null|                      0.000000|
+-------------------------+-------------------+---------------+------------------------------+

Having into account:

-Some column can have all values to null.

-There can be a lot of columns in a dataframe.


As far as I know, I have to use the groupBy with the agg(), but I am unable to get the correct expression:

df.groupBy("name").agg()

Solution

  • If "merge" means sum, column list can be received from dataframe schema and included into "agg":

    val df = Seq(
      ("a", Option.empty[Double], Option.empty[Double], Some(0.000000)),
      ("a", Some(0.000000), Option.empty[Double], Option.empty[Double]),
      ("b", Option.empty[Double], Option.empty[Double], Some(0.000000)),
      ("b", Some(300.000000), Option.empty[Double], Option.empty[Double])
    ).toDF(
      "name", "col1", "col2", "col3"
    )
    val columnsToMerge = df
      .columns
      .filterNot(_ == "name")
      .map(c => sum(c).alias(c))
    
    df.groupBy("name")
      .agg(columnsToMerge.head, columnsToMerge.tail: _*)
    

    Result:

    +----+-----+----+----+
    |name|col1 |col2|col3|
    +----+-----+----+----+
    |a   |0.0  |null|0.0 |
    |b   |300.0|null|0.0 |
    +----+-----+----+----+