I encountered an issue while trying to store JSON data as a Delta Lake table using PySpark and Delta Lake.
Here's my code:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from delta import *
delta_version = "2.4.0"
spark = SparkSession.builder \
.appName("JSONToDeltaLake") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.config("spark.jars.packages", f"io.delta:delta-core_2.12:{delta_version}") \
.getOrCreate()
json_data = """
[
{
"name": "John Doe",
"age": 30,
"city": "New York"
},
{
"name": "Jane Smith",
"age": 25,
"city": "Los Angeles"
}
]
"""
json_path = "example_data.json"
with open(json_path, "w") as file:
file.write(json_data)
schema = StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), True),
StructField("city", StringType(), True)
])
try:
df = spark.read.schema(schema).json(json_path)
except Exception as e:
print(f"Error reading JSON file: {e}")
spark.stop()
exit(1)
df.printSchema()
df.show()
delta_path = "example_delta_table"
df.write.format("delta").mode("overwrite").save(delta_path)
delta_table = DeltaTable.forPath(spark, delta_path)
delta_df = delta_table.toDF()
delta_df.show()
spark.stop()
This code generates example JSON data, saves it to a file, reads the JSON data using PySpark, and then stores it as a Delta Lake table.
However, when I run the code, only null values are being stored in the Delta Lake table.
& C:/Users/no2si/AppData/Local/Programs/Python/Python311/python.exe c:/Users/no2si/Documents/MarketReSearch/TodayhomeScrape/deltalpp.py
:: loading settings :: url = jar:file:/C:/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: C:\Users\no2si\.ivy2\cache
The jars for the packages stored in: C:\Users\no2si\.ivy2\jars
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-58d527be-f279-42cc-a057-5a43146af2cd;1.0
confs: [default]
found io.delta#delta-core_2.12;2.4.0 in central
found io.delta#delta-storage;2.4.0 in central
found org.antlr#antlr4-runtime;4.9.3 in central
:: resolution report :: resolve 173ms :: artifacts dl 8ms
:: modules in use:
io.delta#delta-core_2.12;2.4.0 from central in [default]
io.delta#delta-storage;2.4.0 from central in [default]
org.antlr#antlr4-runtime;4.9.3 from central in [default]
---------------------------------------------------------------------
| | modules || artifacts |
| conf | number| search|dwnlded|evicted|| number|dwnlded|
---------------------------------------------------------------------
| default | 3 | 0 | 0 | 0 || 3 | 0 |
---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-58d527be-f279-42cc-a057-5a43146af2cd
confs: [default]
0 artifacts copied, 3 already retrieved (0kB/10ms)
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
root
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
|-- city: string (nullable = true)
+----+----+----+
|name| age|city|
+----+----+----+
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
+----+----+----+
24/06/07 13:58:49 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
+----+----+----+
|name| age|city|
+----+----+----+
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
+----+----+----+
24/06/07 13:58:56 WARN SparkEnv: Exception while deleting Spark temp dir: C:\Users\no2si\Documents\MarketReSearch\TodayhomeScrape\spark_temp\spark-eb98b831-105b-46fe-8173-be24ef43c323\userFiles-6cf29488-da35-4e58-8d65-1156838ddd7c
java.io.IOException: Failed to delete: C:\Users\no2si\Documents\MarketReSearch\TodayhomeScrape\spark_temp\spark-eb98b831-105b-46fe-8173-be24ef43c323\userFiles-6cf29488-da35-4e58-8d65-1156838ddd7c\org.antlr_antlr4-runtime-4.9.3.jar
at org.apache.spark.network.util.JavaUtils.deleteRecursivelyUsingJavaIO(JavaUtils.java:151)
at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:121)
at org.apache.spark.network.util.JavaUtils.deleteRecursivelyUsingJavaIO(JavaUtils.java:134)
at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:121)
at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:94)
at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:1231)
at org.apache.spark.SparkEnv.stop(SparkEnv.scala:108)
at org.apache.spark.SparkContext.$anonfun$stop$25(SparkContext.scala:2175)
at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1509)
at org.apache.spark.SparkContext.stop(SparkContext.scala:2175)
at org.apache.spark.SparkContext.stop(SparkContext.scala:2081)
at org.apache.spark.api.java.JavaSparkContext.stop(JavaSparkContext.scala:550)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:834)
24/06/07 13:58:57 ERROR ShutdownHookManager: Exception while deleting Spark temp dir: C:\Users\no2si\Documents\MarketReSearch\TodayhomeScrape\spark_temp\spark-eb98b831-105b-46fe-8173-be24ef43c323
java.io.IOException: Failed to delete: C:\Users\no2si\Documents\MarketReSearch\TodayhomeScrape\spark_temp\spark-eb98b831-105b-46fe-8173-be24ef43c323\userFiles-6cf29488-da35-4e58-8d65-1156838ddd7c\org.antlr_antlr4-runtime-4.9.3.jar
at org.apache.spark.network.util.JavaUtils.deleteRecursivelyUsingJavaIO(JavaUtils.java:151)
at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:121)
at org.apache.spark.network.util.JavaUtils.deleteRecursivelyUsingJavaIO(JavaUtils.java:134)
at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:121)
at org.apache.spark.network.util.JavaUtils.deleteRecursivelyUsingJavaIO(JavaUtils.java:134)
at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:121)
at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:94)
at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:1231)
at org.apache.spark.util.ShutdownHookManager$.$anonfun$new$4(ShutdownHookManager.scala:65)
at org.apache.spark.util.ShutdownHookManager$.$anonfun$new$4$adapted(ShutdownHookManager.scala:62)
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
at org.apache.spark.util.ShutdownHookManager$.$anonfun$new$2(ShutdownHookManager.scala:62)
at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:214)
at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$2(ShutdownHookManager.scala:188)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2088)
at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$1(ShutdownHookManager.scala:188)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at scala.util.Try$.apply(Try.scala:213)
at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188)
at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
PS C:\Users\no2si\Documents\MarketReSearch\TodayhomeScrape> 24/06/07 13:58:57 ERROR ShutdownHookManager: Exception while deleting Spark temp dir: C:\Users\no2si\Documents\MarketReSearch\TodayhomeScrape\spark_temp\spark-eb98b831-105b-46fe-8173-be24ef43c323\userFiles-6cf29488-da35-4e58-8d65-1156838ddd7c
java.io.IOException: Failed to delete: C:\Users\no2si\Documents\MarketReSearch\TodayhomeScrape\spark_temp\spark-eb98b831-105b-46fe-8173-be24ef43c323\userFiles-6cf29488-da35-4e58-8d65-1156838ddd7c\org.antlr_antlr4-runtime-4.9.3.jar
at org.apache.spark.network.util.JavaUtils.deleteRecursivelyUsingJavaIO(JavaUtils.java:151)
at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:121)
at org.apache.spark.network.util.JavaUtils.deleteRecursivelyUsingJavaIO(JavaUtils.java:134)
at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:121)
at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:94)
at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:1231)
at org.apache.spark.util.ShutdownHookManager$.$anonfun$new$4(ShutdownHookManager.scala:65)
at org.apache.spark.util.ShutdownHookManager$.$anonfun$new$4$adapted(ShutdownHookManager.scala:62)
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
at org.apache.spark.util.ShutdownHookManager$.$anonfun$new$2(ShutdownHookManager.scala:62)
at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:214)
at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$2(ShutdownHookManager.scala:188)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2088)
at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$1(ShutdownHookManager.scala:188)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at scala.util.Try$.apply(Try.scala:213)
at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188)
at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
SUCCESS: The process with PID 7444 (child process of PID 14792) has been terminated.
SUCCESS: The process with PID 14792 (child process of PID 13788) has been terminated.
SUCCESS: The process with PID 13788 (child process of PID 1864) has been terminated.
What should I modify to resolve this issue? I would appreciate it if you could guide me on how to correctly store the JSON data in the Delta Lake table.
Additionally, I would also appreciate any advice on the causes and solutions for the warning and error messages that appeared in the logs.
Thank you.
I tried to verify that the JSON file was loading correctly and that the specified storage path was valid.
To check if the JSON file was loading properly, I added code to print the contents of the JSON file after saving it. This allowed me to confirm that the JSON data was being written to the file as expected.
Your json file is using more than one line, so you need to use the multiline option to read you file. Add the .option("multiline", "true") when you read the json file.
df = spark.read.schema(schema).option("multiline", "true").json(json_path)
References: