Search code examples
csvpysparkazure-databricks

Writing a DataFrame to a CSV file in Azure Databricks


I'm trying to write the contents of a Spark DataFrame to a CSV file in DBFS. It's important for later functions that the CSV file be a singular file of the specified name. The files written to CSV files will always be small.

The code I've come up with is as below:

import os
import shutil

file_path = "/dbfs/path/to/csv/file/File.csv"

temp_file_path = "/tmp/temp_file/"

file_string_or_dataframe.coalesce(1).write.format("com.databricks.spark.
csv").option("header", True).mode("overwrite").save(temp_file_path)
            
csv_file = [file for file in os.listdir('/dbfs/' + temp_file_path) if 
file.endswith('.csv')][0]
            
new_csv_file_path = os.path.join('/dbfs/', os.path.dirname(file_path), 
csv_file)
os.rename('/dbfs' + os.path.join(temp_file_path, csv_file), 
new_csv_file_path)
            
shutil.move(new_csv_file_path, file_path)
            
shutil.rmtree('/dbfs/' + temp_file_path)

print("Output written to CSV file: " + file_path)

This still writes the csv to a file that begins with "part" and it inserts it into a folder that has the desired file name. How can I rename the part file and not have write to a folder?

If I change new_csv_file_path to be the desired file name, I end up with an error saying that it is a directory.

Any way around this would be appreciated! Thanks.


Solution

  • When writing a pyspark dataframe to a file, it will always write to a part file by default. This is because of partitions, even if there is only 1 partitions.

    There are 2 options. If your file is small and can fit on the driver node memory, you can convert the pyspark dataframe to a pandas dataframe and then write to csv like so.

    df.toPandas().to_csv(path)
    

    OR

    You can coalesce to 1 partition(like you have done), and then copy and rename that 1 partition file to your target file path like so. And then clean up the original path file directory.

    import os
    import shutil
    
    file_path = "/dbfs/test1/File.csv"
    
    temp_file_path = "/tmp/temp_file/"
    
    dataframe.coalesce(1).write.format("com.databricks.spark.csv").option("header", True).mode("overwrite").save(temp_file_path)
                
    csv_file = [file for file in os.listdir('/dbfs/' + temp_file_path) if 
    file.endswith('.csv')][0]
                
    new_csv_file_path = os.path.join('/dbfs/', os.path.dirname(file_path), 
    csv_file)
    
    #print(f"new_csv_file_path {new_csv_file_path}")
    #os.rename('/dbfs' + os.path.join(temp_file_path, csv_file), 
    #new_csv_file_path)
    
    old_path = temp_file_path + csv_file
    print(f"old_path : {old_path}")
    dbutils.fs.cp(temp_file_path + csv_file, file_path)
    
    shutil.rmtree('/dbfs/' + temp_file_path)
    
    print("Output written to CSV file: " + file_path)