Search code examples
apache-sparkpysparkparquet

is it possible to prevent appending a file if the schema is not correct?


The following example shows that the spark allows appending a file even if the data to be appended have one more column (different schema). Is there a way to prevent this from happening? In principle a parquet file contains the schema so is there an automatic way to achieve this?

from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import *
import os
# imports for random dataset creation
import random
import string
spark = SparkSession.builder.appName('learn').master('yarn').enableHiveSupport().getOrCreate()
# first save
schema1 = StructType([
    StructField('id_inside', LongType(), nullable=False),
    StructField('name_inside', StringType(), nullable=False),
])
data1 = [[random.randint(0, 5), ''.join(random.choice(string.ascii_lowercase) for _ in range(10))] for _ in range(10)]
df1 = spark.createDataFrame(data1, schema=schema1)
df1.write.format('parquet').mode(saveMode='overwrite').save('/tmp/df1_2')
# df1_2.show()
# +---------+-----------+
# |id_inside|name_inside|
# +---------+-----------+
# |        0| krohfjcwwo|
# |        0| cvkwmuddxf|
# |        5| rsxtjdfwjv|
os.system('hadoop fs -ls /tmp/df1_2')
# os.system('hadoop fs -ls /tmp/df1_2')
# Found 3 items
# -rw-r--r--   3 sdap hdfs          0 2021-07-30 17:58 /tmp/df1_2/_SUCCESS
# -rw-r--r--   3 sdap hdfs        723 2021-07-30 17:58 /tmp/df1_2/part-00000-e353d0d9-ff1e-4436-8a2f-95fcf69eb0ca-c000.snappy.parquet
# -rw-r--r--   3 sdap hdfs        720 2021-07-30 17:58 /tmp/df1_2/part-00001-e353d0d9-ff1e-4436-8a2f-95fcf69eb0ca-c000.snappy.parquet

# append data using a different schema
schema2 = StructType([
    StructField('id_inside', LongType(), nullable=False),
    StructField('name_inside', StringType(), nullable=False),
    StructField('name_inside2', StringType(), nullable=False),
])
data2 = [[random.randint(0, 5), *[''.join(random.choice(string.ascii_lowercase) for _ in range(10))]*2 ] for _ in range(10)]
df2 = spark.createDataFrame(data2, schema=schema2)
df2.write.format('parquet').mode(saveMode='append').save('/tmp/df1_2')
os.system('hadoop fs -ls /tmp/df1_2')
# Found 5 items
# -rw-r--r--   3 sdap hdfs          0 2021-07-30 17:58 /tmp/df1_2/_SUCCESS
# -rw-r--r--   3 sdap hdfs       1006 2021-07-30 17:58 /tmp/df1_2/part-00000-d5372b57-4173-401d-94e2-c78d3b5c395c-c000.snappy.parquet
# -rw-r--r--   3 sdap hdfs        723 2021-07-30 17:58 /tmp/df1_2/part-00000-e353d0d9-ff1e-4436-8a2f-95fcf69eb0ca-c000.snappy.parquet
# -rw-r--r--   3 sdap hdfs       1003 2021-07-30 17:58 /tmp/df1_2/part-00001-d5372b57-4173-401d-94e2-c78d3b5c395c-c000.snappy.parquet
# -rw-r--r--   3 sdap hdfs        720 2021-07-30 17:58 /tmp/df1_2/part-00001-e353d0d9-ff1e-4436-8a2f-95fcf69eb0ca-c000.snappy.parquet


# read the file back in and check the schema
res = spark.read.format('parquet').load('/tmp/df1_2')
res.sample(withReplacement=False, fraction=0.1).show()
# +---------+-----------+------------+
# |id_inside|name_inside|name_inside2|
# +---------+-----------+------------+
# |        5| gmafmuprti|  gmafmuprti|
# |        3| ttshihunbe|  ttshihunbe|
# |        2| dlrpqnzwrz|        null|
# +---------+-----------+------------+
res.printSchema()
# root
#  |-- id_inside: long (nullable = true)
#  |-- name_inside: string (nullable = true)
#  |-- name_inside2: string (nullable = true)

Solution

  • No. It goes against the whole notion of schema evolution. You would need to compare current schema with new one, with if logic.