Use case is to append a column to a Parquet dataset and then re-write efficiently at the same location. Here is a minimal example.
Create a pandas
DataFrame and write as a partitioned Parquet dataset.
import pandas as pd
df = pd.DataFrame({
'id': ['a','a','a','b','b','b','b','c','c'],
'value': [0,1,2,3,4,5,6,7,8]})
path = r'c:/data.parquet'
df.to_parquet(path=path, engine='pyarrow', compression='snappy', index=False, partition_cols=['id'], flavor='spark')
Then load the Parquet dataset as a pyspark
view and create a modified dataset as a pyspark
DataFrame.
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark.read.parquet(path).createTempView('data')
sf = spark.sql(f"""SELECT id, value, 0 AS segment FROM data""")
At this point sf
data is same as df
data but with an additional segment
column of all zeros. I would like to efficiently overwrite the existing Parquet dataset at path
with sf
as a Parquet dataset in the same location. Below is what does not work. Also prefer not to write sf
to a new location, delete old Parquet dataset, and rename as does not seem efficient.
# saves existing data and new data
sf.write.partitionBy('id').mode('append').parquet(path)
# immediately deletes existing data then crashes
sf.write.partitionBy('id').mode('overwrite').parquet(path)
My answer in short: you shouldn't :\
One principle of bigdata (and spark is for bigdata), is to never override stuff. Sure, there exist the .mode('overwrite')
, but this is not a correct usage.
My guesses as to why it could (should) fail:
What I usually do in such situation is to create another dataset, and when there is no reason to keep to old one (i.e. when the processing is completely finished), clean it. To remove files, you can check this post on how to delete hdfs files. It should work for all files accessible by spark. However it is in scala, so I'm not sure if it can be adapted to pyspark.
Note that efficiency is not a good reason to override, it does more work that simply writing.