Search code examples
sqljsondataframeapache-spark-sqlinsertion

insert json file data to sql table using sparksql


Im trying to insert json file data to sql table using sparksql my sample json file example :

{
    "id": "value_string",
    "aggregate_id": "value_string",
    "type": "value_string",
    "timestamp": "value_string",
    "data": {
        "customer_id": "value_string",
        "name": "value_string"
    }
}

want to insert in sql table using spark , tried creating like this as shown but couldn't

     public class DataOfPerson
        {
            public string name { get; set; }
            public string birthdate { get; set; }
            public string customer_id { get; set; }

        }
        public class Person
        {
            public string id { get; set; }
            public string aggregate_id { get; set; }
            public string type { get; set; }
            public string timestamp { get; set; }  
            public List<DataOfPerson> dataOfPerson { get; set; }
        }
        public class RootObject
        {
            public Person person { get; set; }
        }
        var root = JsonConvert.DeserializeObject<RootObject> (sqlContext.read.json(s"abfss://abc@xyz/events.json")

Solution

  • def flattenDataFrame(spark: SparkSession, nestedDf: DataFrame): DataFrame = {

    var flatCols = Array.empty[String]
    var nestedCols = Array.empty[String]
    var flatDF = spark.emptyDataFrame
    for (w <- nestedDf.dtypes) {
      if (w._2.contains("Struct")) {
        nestedCols = nestedCols.:+(w._1)
      } else {
        flatCols = flatCols.:+(w._1)
      }
    }
    
    var nestedCol = Array.empty[String]
    for (nc <- nestedCols) {
      for (c <- nestedDf.select(nc + ".*").columns) {
        nestedCol = nestedCol.:+(nc + "." + c)
      }
    }
    val allColumns = flatCols ++ nestedCol
    val colNames = allColumns.map(name => col(name))
    nestedDf.select(colNames: _*)
    

    }