I have the below DF.
+-------------------+-----------------+-----------------+
|PAYMENT_TYPE |TRANSACTION_TYPE |TRANSACTION_COUNT|
+-------------------+-----------------+-----------------+
|OFFLINE |CHEQUE |5 |
|ONLINE |CREDIT |135 |
|ONLINE |DEBIT |297 |
+-------------------+-----------------+-----------------+
Have to group this into json string by grouping based on PAYMENT_TYPE. Any idea how to do this.
{"OFFLINE": {"CHEQUE": 5}, "ONLINE": {"CREDIT": 135, "DEBIT": 297}}
I have tried using the below
val data = res.groupBy($"PAYMENT_TYPE").agg(collect_list(concat($"TRANSACTION_TYPE", lit(":"), $"TRANSACTION_COUNT")).as("json_data")).show(false)
But able to get the below output but couldn't get the above output format
+-------------------+-----------------------+
|PAYMENT_TYPE |json_data |
+-------------------+-----------------------+
|OFFLINE |[CHEQUE:5] |
|ONLINE |[CREDIT:135, DEBIT:297]|
+-------------------+-----------------------+
Thanks
Please find the solution below.
Same can be achieved using udf
with json libraries in easy way.
Note
- I have used multiple aggregated functions to achieve expected results & It might impact performance as well.
val df = """
+-------------------+-----------------+-----------------+
|PAYMENT_TYPE |TRANSACTION_TYPE |TRANSACTION_COUNT|
+-------------------+-----------------+-----------------+
|OFFLINE |CHEQUE |5 |
|ONLINE |CREDIT |135 |
|ONLINE |DEBIT |297 |
+-------------------+-----------------+-----------------+
""".toDF
df.show(false)
+------------+----------------+-----------------+
|PAYMENT_TYPE|TRANSACTION_TYPE|TRANSACTION_COUNT|
+------------+----------------+-----------------+
|OFFLINE |CHEQUE |5 |
|ONLINE |CREDIT |135 |
|ONLINE |DEBIT |297 |
+------------+----------------+-----------------+
val mdf = df
.groupBy($"PAYMENT_TYPE")
.agg(
collect_list(
map(
$"TRANSACTION_TYPE",
$"TRANSACTION_COUNT"
)
).as("data"))
.selectExpr(
"PAYMENT_TYPE",
"reduce(data, map('', ''), (acc, e) -> map_filter(map_concat(acc, e), (k, _) -> k != '')) as data"
)
.groupBy(lit(1))
.pivot($"PAYMENT_TYPE")
.agg(first($"data"))
.drop("1")
mdf.show(false)
+-------------+-----------------------------+
|OFFLINE |ONLINE |
+-------------+-----------------------------+
|{CHEQUE -> 5}|{CREDIT -> 135, DEBIT -> 297}|
+-------------+-----------------------------+
mdf
.select(
mdf
.columns
.map(c => map(lit(c), col(c)))
.reduce(map_concat(_, _))
.as("json_data")
)
.select(to_json($"json_data").as("data"))
.show(false)
+------------------------------------------------------------------+
|data |
+------------------------------------------------------------------+
|{"OFFLINE":{"CHEQUE":"5"},"ONLINE":{"CREDIT":"135","DEBIT":"297"}}|
+------------------------------------------------------------------+