I'm working with PySpark within Synapse notebooks and I need to load a Parquet file into a DataFrame, apply some transformations (e.g., renaming columns), and then save the modified DataFrame back to the same location, overwriting the original file. However, when I try to save the DataFrame, it creates a directory with the name Test.parquet containing two files: one named SUCCESS and another with a string of random letters and numbers.
Here's the code I am using:
%%pyspark
df = spark.read.load('path/to/Test.parquet', format='parquet')
display(df.limit(10))
column_mapping = {
"FullName": "Full Name",
}
for old_col, new_col in column_mapping.items():
df = df.withColumnRenamed(old_col, new_col)
display(df.limit(10))
df.write.parquet('path/to/Test.parquet', mode='overwrite')
Here is how is overwrites the file:
How can I correctly overwrite the original Parquet file without creating additional files or directories? Any help is appreciated!
The problem in spark saving just a single parquet lies in its nature of partitioning data tables for concurrency purposes. Also the files are compressed by snappy algorithm. I tried to provide an answer based on databricks filesystem. Maybe you need to adapt the filepaths to your needs.
What you need to do is:
coalsce(1)
to achieve one single file in the folder made by df.write
commandos.listdir
operation in the folder made by sparkos.replace
shutil.rmtree
I provided you an answer how you can achieve what you want.
from pyspark.sql import DataFrame
import os
import shutil
from typing import Union
from pathlib import Path
def get_local_path(path: Union[str, Path]) -> str:
"""
Transforms a potential dbfs path to a
path accessible by standard file system operations
:param path: Path to transform to local path
:return: The local path
"""
return str(path).replace("dbfs:", "/dbfs")
def save_as_one_parquet_file(
df: DataFrame,
output_file_path: str,
):
"""
Saves a spark dataframe as a single parquet file.
:param df: The spark dataframe to write.
:param output_file_path: The output filepath for writing the parquet file.
"""
localpath = get_local_path(output_file_path)
tmp_file_path = localpath + "_temp"
(
df.coalesce(1)
.write.mode("overwrite")
.format("parquet")
.save(output_file_path + "_temp")
)
file = [file for file in os.listdir(localpath + "_temp") if file.endswith(".parquet")][0]
os.replace(os.path.join(localpath + "_temp", file), localpath)
shutil.rmtree(tmp_file_path)
# Reading in
path = "dbfs:/mnt/dl2-temp-p-chn-1/test/Flights 1m.parquet"
df = spark.read.parquet(path)
# Transformations
df_new = df.withColumn("blub", f.lit(2))
# Saving as one parquet file
save_as_one_parquet_file(df_new, path)