Search code examples
pysparkazure-synapseazure-synapse-analyticsazure-notebooks

How to Overwrite a Parquet File in the Same Location Using PySpark


I'm working with PySpark within Synapse notebooks and I need to load a Parquet file into a DataFrame, apply some transformations (e.g., renaming columns), and then save the modified DataFrame back to the same location, overwriting the original file. However, when I try to save the DataFrame, it creates a directory with the name Test.parquet containing two files: one named SUCCESS and another with a string of random letters and numbers.

Here's the code I am using:

%%pyspark
df = spark.read.load('path/to/Test.parquet', format='parquet')
display(df.limit(10))

column_mapping = {
    "FullName": "Full Name",
}

for old_col, new_col in column_mapping.items():
    df = df.withColumnRenamed(old_col, new_col)
    display(df.limit(10))
df.write.parquet('path/to/Test.parquet', mode='overwrite')

Here is how is overwrites the file:

enter image description here

How can I correctly overwrite the original Parquet file without creating additional files or directories? Any help is appreciated!


Solution

  • The problem in spark saving just a single parquet lies in its nature of partitioning data tables for concurrency purposes. Also the files are compressed by snappy algorithm. I tried to provide an answer based on databricks filesystem. Maybe you need to adapt the filepaths to your needs.

    What you need to do is:

    1. Coalesce the DataFrame by coalsce(1) to achieve one single file in the folder made by df.write command
    2. Recognize the single parquet file by os.listdir operation in the folder made by spark
    3. Replace it to the directory where you initially read from by os.replace
    4. Remove the folder made from spark with all containing subfiles by shutil.rmtree

    I provided you an answer how you can achieve what you want.

    
    from pyspark.sql import DataFrame
    import os
    import shutil
    from typing import Union
    from pathlib import Path
    
    
    def get_local_path(path: Union[str, Path]) -> str:
        """
        Transforms a potential dbfs path to a
        path accessible by standard file system operations
    
        :param path: Path to transform to local path
        :return: The local path
        """
        return str(path).replace("dbfs:", "/dbfs")
    
    def save_as_one_parquet_file(
        df: DataFrame,
        output_file_path: str,
    ):
        """
        Saves a spark dataframe as a single parquet file.
    
        :param df: The spark dataframe to write.
        :param output_file_path: The output filepath for writing the parquet file.
        """
    
        localpath = get_local_path(output_file_path)
        tmp_file_path = localpath + "_temp"
        (
            df.coalesce(1)
            .write.mode("overwrite")
            .format("parquet")
            .save(output_file_path + "_temp")
        )
    
        file = [file for file in os.listdir(localpath + "_temp") if file.endswith(".parquet")][0]
        os.replace(os.path.join(localpath + "_temp", file), localpath)
        shutil.rmtree(tmp_file_path)
    
    # Reading in
    path = "dbfs:/mnt/dl2-temp-p-chn-1/test/Flights 1m.parquet"
    df = spark.read.parquet(path)
    
    # Transformations
    df_new = df.withColumn("blub", f.lit(2))
    
    # Saving as one parquet file
    save_as_one_parquet_file(df_new, path)