How to convert JSON object as a value in a column in SPARK AZURE-DATABRICKS using SCALA as per requirement

Guys Can someone help me in resolving the below problem:

I have a spark dataframe in which one of the column's values is coming in JSON format. Please find the below screen shot for your reference: Column - ratings

My requirement is as follows:

  1. I have to select the "average_rating" attribute which is not null out of all the 3 objects in a single row. If there is none I take null. I have to add this value to the column "Average_Rating". I have to select the "STATUS" from the object where "average_rating" is not null and add the value to a column "average_rating_status".
  2. I have to select the "number_of_recent_voters" attribute which is not null out of all the 3 objects in a single row. If there is none I take null. I have to add this value to the column "number_of_recent_voters". I have to select the "STATUS" from the object where "number_of_recent_voters" is not null and add the value to a column "number_of_recent_voters_status".
  3. I have to select the "number_of_voters" attribute which is not null out of all the 3 objects in a single row. If there is none I take null. I have to add this value to the column "number_of_voters". I have to select the "STATUS" from the object where "number_of_voters" is not null and add the value to a column "number_of_voters_status".

I have to write the code in scala in my Azure data bricks notebook. Can anyone please help with the code.

Thank you


| compliance | average_rating | number_of_recent_voters | number_of_voters | average_rating_status | number_of_recent_voters_status | number_of_voters_status |
| true       | 4.7            | 254                     | 254              | PASS                  | PASS                           | PASS                    |

Output should come like above.


  • Code

    val data = List(
      (true, """[{"average_rating":4.7,"number_of_recent_voters":null,"number_of_voters":null,"status":"PASS"},{"average_rating":null,"number_of_recent_voters":null,"number_of_voters":254,"status":"FAIL"},{"average_rating":null,"number_of_recent_voters":254,"number_of_voters":null,"status":"PASS"}]"""),
      (true, """[{"average_rating":2.7,"number_of_recent_voters":null,"number_of_voters":null,"status":"PASS"},{"average_rating":null,"number_of_recent_voters":null,"number_of_voters":123,"status":"PASS"},{"average_rating":null,"number_of_recent_voters":324,"number_of_voters":null,"status":"PASS"}]""")
    ).toDF("compliance", "rating")
    import org.apache.spark.sql.expressions.Window
    import org.apache.spark.sql.functions.{from_json, row_number}
    val windowSpec = Window.orderBy("rating")
    val data1 = data.withColumn("row_number", row_number.over(windowSpec))
    val data2 = data1.selectExpr("compliance", "row_number", """inline_outer(from_json(rating, 'ARRAY<STRUCT<average_rating DOUBLE, number_of_recent_voters DOUBLE, number_of_voters DOUBLE, status STRING>>'))""")
    val data3 = data2.groupBy("row_number").max("average_rating","number_of_recent_voters","number_of_voters").withColumnRenamed("max(average_rating)","average_rating").withColumnRenamed("max(number_of_recent_voters)","number_of_recent_voters").withColumnRenamed("max(number_of_voters)","number_of_voters")
    val data4 = data3.join(data2, Seq("row_number", "average_rating"), "inner").select(data3.col("*"), data2.col("status")).withColumnRenamed("status","average_rating_status")
    val data5 = data4.join(data2, Seq("row_number","number_of_recent_voters"),"inner").select(data4.col("*"),data2.col("status")).withColumnRenamed("status","number_of_recent_voters_status")
    val data6 = data5.join(data2, Seq("row_number","number_of_voters"),"inner").select(data5.col("*"),data2.col("status")).withColumnRenamed("status","number_of_voters_status")

    This code adds a row_number column to the dataframe using the row_number function and the window specification defined on rating field. It uses the from_json function to parse the rating column as an array of structs, and then uses the inline_outer function to explode the array into separate rows. It groups the resulting dataframe by the row_number column and calculates the maximum value of the average_rating, number_of_recent_voters, and number_of_voters columns for each group. It then joins the resulting dataframe with the original dataframe on the row_number and average_rating columns, and selects the status column from the original dataframe. It renames the status column to average_rating_status. Similarly, it is done for the number_of_recent_voters and number_of_voters columns, and renames the status columns to number_of_recent_voters_status and number_of_voters_status, respectively.

    The resulting dataframe data6 has the required results.