The following example shows that the spark allows appending a file even if the data to be appended have one more column (different schema). Is there a way to prevent this from happening? In principle a parquet file contains the schema so is there an automatic way to achieve this?
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import *
import os
# imports for random dataset creation
import random
import string
spark = SparkSession.builder.appName('learn').master('yarn').enableHiveSupport().getOrCreate()
# first save
schema1 = StructType([
StructField('id_inside', LongType(), nullable=False),
StructField('name_inside', StringType(), nullable=False),
])
data1 = [[random.randint(0, 5), ''.join(random.choice(string.ascii_lowercase) for _ in range(10))] for _ in range(10)]
df1 = spark.createDataFrame(data1, schema=schema1)
df1.write.format('parquet').mode(saveMode='overwrite').save('/tmp/df1_2')
# df1_2.show()
# +---------+-----------+
# |id_inside|name_inside|
# +---------+-----------+
# | 0| krohfjcwwo|
# | 0| cvkwmuddxf|
# | 5| rsxtjdfwjv|
os.system('hadoop fs -ls /tmp/df1_2')
# os.system('hadoop fs -ls /tmp/df1_2')
# Found 3 items
# -rw-r--r-- 3 sdap hdfs 0 2021-07-30 17:58 /tmp/df1_2/_SUCCESS
# -rw-r--r-- 3 sdap hdfs 723 2021-07-30 17:58 /tmp/df1_2/part-00000-e353d0d9-ff1e-4436-8a2f-95fcf69eb0ca-c000.snappy.parquet
# -rw-r--r-- 3 sdap hdfs 720 2021-07-30 17:58 /tmp/df1_2/part-00001-e353d0d9-ff1e-4436-8a2f-95fcf69eb0ca-c000.snappy.parquet
# append data using a different schema
schema2 = StructType([
StructField('id_inside', LongType(), nullable=False),
StructField('name_inside', StringType(), nullable=False),
StructField('name_inside2', StringType(), nullable=False),
])
data2 = [[random.randint(0, 5), *[''.join(random.choice(string.ascii_lowercase) for _ in range(10))]*2 ] for _ in range(10)]
df2 = spark.createDataFrame(data2, schema=schema2)
df2.write.format('parquet').mode(saveMode='append').save('/tmp/df1_2')
os.system('hadoop fs -ls /tmp/df1_2')
# Found 5 items
# -rw-r--r-- 3 sdap hdfs 0 2021-07-30 17:58 /tmp/df1_2/_SUCCESS
# -rw-r--r-- 3 sdap hdfs 1006 2021-07-30 17:58 /tmp/df1_2/part-00000-d5372b57-4173-401d-94e2-c78d3b5c395c-c000.snappy.parquet
# -rw-r--r-- 3 sdap hdfs 723 2021-07-30 17:58 /tmp/df1_2/part-00000-e353d0d9-ff1e-4436-8a2f-95fcf69eb0ca-c000.snappy.parquet
# -rw-r--r-- 3 sdap hdfs 1003 2021-07-30 17:58 /tmp/df1_2/part-00001-d5372b57-4173-401d-94e2-c78d3b5c395c-c000.snappy.parquet
# -rw-r--r-- 3 sdap hdfs 720 2021-07-30 17:58 /tmp/df1_2/part-00001-e353d0d9-ff1e-4436-8a2f-95fcf69eb0ca-c000.snappy.parquet
# read the file back in and check the schema
res = spark.read.format('parquet').load('/tmp/df1_2')
res.sample(withReplacement=False, fraction=0.1).show()
# +---------+-----------+------------+
# |id_inside|name_inside|name_inside2|
# +---------+-----------+------------+
# | 5| gmafmuprti| gmafmuprti|
# | 3| ttshihunbe| ttshihunbe|
# | 2| dlrpqnzwrz| null|
# +---------+-----------+------------+
res.printSchema()
# root
# |-- id_inside: long (nullable = true)
# |-- name_inside: string (nullable = true)
# |-- name_inside2: string (nullable = true)
No. It goes against the whole notion of schema evolution. You would need to compare current schema with new one, with if logic.