Search code examples
scalaapache-sparkapache-spark-sql

Convert DF table to json by grouping a column


I have the below DF.

+-------------------+-----------------+-----------------+
|PAYMENT_TYPE       |TRANSACTION_TYPE |TRANSACTION_COUNT|
+-------------------+-----------------+-----------------+
|OFFLINE            |CHEQUE           |5                |
|ONLINE             |CREDIT           |135              |
|ONLINE             |DEBIT            |297              |
+-------------------+-----------------+-----------------+

Have to group this into json string by grouping based on PAYMENT_TYPE. Any idea how to do this.

{"OFFLINE": {"CHEQUE": 5}, "ONLINE": {"CREDIT": 135, "DEBIT": 297}}

I have tried using the below

val data = res.groupBy($"PAYMENT_TYPE").agg(collect_list(concat($"TRANSACTION_TYPE", lit(":"), $"TRANSACTION_COUNT")).as("json_data")).show(false)

But able to get the below output but couldn't get the above output format

+-------------------+-----------------------+
|PAYMENT_TYPE       |json_data              |
+-------------------+-----------------------+
|OFFLINE            |[CHEQUE:5]             |
|ONLINE             |[CREDIT:135, DEBIT:297]|
+-------------------+-----------------------+

Thanks


Solution

  • Please find the solution below.

    Same can be achieved using udf with json libraries in easy way.

    Note - I have used multiple aggregated functions to achieve expected results & It might impact performance as well.

    val df = """
    +-------------------+-----------------+-----------------+
    |PAYMENT_TYPE       |TRANSACTION_TYPE |TRANSACTION_COUNT|
    +-------------------+-----------------+-----------------+
    |OFFLINE            |CHEQUE           |5                |
    |ONLINE             |CREDIT           |135              |
    |ONLINE             |DEBIT            |297              |
    +-------------------+-----------------+-----------------+
    """.toDF
    
    df.show(false)
    
    +------------+----------------+-----------------+
    |PAYMENT_TYPE|TRANSACTION_TYPE|TRANSACTION_COUNT|
    +------------+----------------+-----------------+
    |OFFLINE     |CHEQUE          |5                |
    |ONLINE      |CREDIT          |135              |
    |ONLINE      |DEBIT           |297              |
    +------------+----------------+-----------------+
    
    val mdf = df
    .groupBy($"PAYMENT_TYPE")
    .agg(
      collect_list(
        map(
          $"TRANSACTION_TYPE", 
          $"TRANSACTION_COUNT"
        )
      ).as("data"))
    .selectExpr(
      "PAYMENT_TYPE",
      "reduce(data, map('', ''), (acc, e) -> map_filter(map_concat(acc, e), (k, _) -> k != '')) as data"
    )
    .groupBy(lit(1))
    .pivot($"PAYMENT_TYPE")
    .agg(first($"data"))
    .drop("1")
    
    mdf.show(false)
    +-------------+-----------------------------+
    |OFFLINE      |ONLINE                       |
    +-------------+-----------------------------+
    |{CHEQUE -> 5}|{CREDIT -> 135, DEBIT -> 297}|
    +-------------+-----------------------------+
    
      mdf
        .select(
          mdf
          .columns
          .map(c => map(lit(c), col(c)))
          .reduce(map_concat(_, _))
          .as("json_data")
        )
      .select(to_json($"json_data").as("data"))
      .show(false)
    
    +------------------------------------------------------------------+
    |data                                                              |
    +------------------------------------------------------------------+
    |{"OFFLINE":{"CHEQUE":"5"},"ONLINE":{"CREDIT":"135","DEBIT":"297"}}|
    +------------------------------------------------------------------+