Search code examples
scalaapache-sparkazure-databricks

How to create nested json using Apache Spark with Scala


I am trying to create a nested JSON from my spark dataframe which has data in following structure.

Vendor_Name,count,Categories,Category_Count,Subcategory,Subcategory_Count
Vendor1,10,Category 1,4,Sub Category 1,1
Vendor1,10,Category 1,4,Sub Category 2,2
Vendor1,10,Category 1,4,Sub Category 3,3
Vendor1,10,Category 1,4,Sub Category 4,4

Required json output in below format using Apache-Spark with Scala.

[{
        "vendor_name": "Vendor 1",
        "count": 10,
        "categories": [{
            "name": "Category 1",
            "count": 4,
            "subCategories": [{
                    "name": "Sub Category 1",
                    "count": 1
                },
                {
                    "name": "Sub Category 2",
                    "count": 1
                },
                {
                    "name": "Sub Category 3",
                    "count": 1
                },
                {
                    "name": "Sub Category 4",
                    "count": 1
                }
            ]
        }]

Solution

  • //read file into DataFrame    
    scala> val df = spark.read.format("csv").option("header", "true").load(<input CSV path>)
        df: org.apache.spark.sql.DataFrame = [Vendor_Name: string, count: string ... 4 more fields]
    
        scala> df.show(false)
        +-----------+-----+----------+--------------+--------------+-----------------+
        |Vendor_Name|count|Categories|Category_Count|Subcategory   |Subcategory_Count|
        +-----------+-----+----------+--------------+--------------+-----------------+
        |Vendor1    |10   |Category 1|4             |Sub Category 1|1                |
        |Vendor1    |10   |Category 1|4             |Sub Category 2|2                |
        |Vendor1    |10   |Category 1|4             |Sub Category 3|3                |
        |Vendor1    |10   |Category 1|4             |Sub Category 4|4                |
        +-----------+-----+----------+--------------+--------------+-----------------+
    
        //convert into desire Json format
        scala> val df1 = df.groupBy("Vendor_Name","count","Categories","Category_Count").agg(collect_list(struct(col("Subcategory").alias("name"),col("Subcategory_Count").alias("count"))).alias("subCategories")).groupBy("Vendor_Name","count").agg(collect_list(struct(col("Categories").alias("name"),col("Category_Count").alias("count"),col("subCategories"))).alias("categories"))
        df1: org.apache.spark.sql.DataFrame = [Vendor_Name: string, count: string ... 1 more field]
    
        scala> df1.printSchema
        root
         |-- Vendor_Name: string (nullable = true)
         |-- count: string (nullable = true)
         |-- categories: array (nullable = true)
         |    |-- element: struct (containsNull = true)
         |    |    |-- name: string (nullable = true)
         |    |    |-- count: string (nullable = true)
         |    |    |-- subCategories: array (nullable = true)
         |    |    |    |-- element: struct (containsNull = true)
         |    |    |    |    |-- name: string (nullable = true)
         |    |    |    |    |-- count: string (nullable = true)
    
        //Write df in json format
        scala> df1.write.format("json").mode("append").save(<output Path>)