Recently I've been trying to make some transformations in some Json files in the Azure Synapse notebooks using Scala language and loading them in using the spark.read function. The problem is the following one:
I do not know what is happening as I have tried to load different types of Jsons and none of them work (they are normal jsons downloaded from Kaggle, though).
{
"results": [{
"columns": [{
"name": "COD",
"type": "NUMBER"
}, {
"name": "TECH",
"type": "NUMBER"
}, {
"name": "OBJECT",
"type": "NUMBER"
}],
"items": [{
"cod": 3699,
"tech": "-",
"object": "Type 2"
}, {
"cod": 3700,
"tech": 56,
"object": "Type 1"
}, {
"cod": 3701,
"tech": 20,
"object": "No type"
}]
}]
}
I am getting similar kind of corrupt data error when I tried to reproduce
As you shared sample data it contains multiple lines to single object and the Json has multiple objects inside it to view this Json file, I used the multiline
option as true
and the exploded each column and selected it.
//reading JSON file from ADLS in json format
val read_path = "abfss://fsn2p@dlsg2p.dfs.core.windows.net/sample.json"
val customers_data_path = spark.read.format("json").option("inferSchema","true").option("multiline","true").load(read_path)
customers_data_path.show();
//Exploding the results column into its object as column
val DF1=customers_data_path.select(explode(col("results")).as("results"))
DF1.show()
//Selecting all columns from results
val DF2 = DF1.select(col("results.*"))
DF2.show();
//further exploding Columns column and items objects
val DF3 = DF2.select(explode(col("columns")).as("columns"),col("items"))
val DF4 = DF3.select(col("columns"),explode(col("items")).as("items"))
DF4.show();
//selecting All columns inside columns and items object
val DF5 = DF4.select(col("columns.*"),col("items.*"))
DF5.show();
//exploding Columns column and items objects
val DF5 = DF2.select(explode(col("columns")).as("columns"))
DF5.show();
val DF6 = DF2.select(explode(col("items")).as("items"))
DF6.show();
//selecting All columns inside columns and items object
val DF7 = DF5.select(col("columns.*"))
val DF8 = DF6.select(col("items.*"))
DF7.show();
DF8.show();
//combining both the above dataframes
val DF10 = DF7.join(DF8, lit(false), "full").show()
//creating sequential index column with help of that join the data frames and then drop it.
import org.apache.spark.sql.types.{StructType, StructField, StringType, LongType};
import spark.implicits._
import org.apache.spark.sql.Row
val df11 = spark.sqlContext.createDataFrame(
DF7.rdd.zipWithIndex.map {
case (row, index) => Row.fromSeq(row.toSeq :+ index)
},
// Create schema for index column
StructType(DF7.schema.fields :+ StructField("index", LongType, false))
)
val df22 = spark.sqlContext.createDataFrame(
DF8.rdd.zipWithIndex.map {
case (row, index) => Row.fromSeq(row.toSeq :+ index)
},
// Create schema for index column
StructType(DF8.schema.fields :+ StructField("index", LongType, false))
)
val DF12 = df11.join(df22, Seq("index")).drop("index")
DF12.show()