I'm trying to write the contents of a Spark DataFrame to a CSV file in DBFS. It's important for later functions that the CSV file be a singular file of the specified name. The files written to CSV files will always be small.
The code I've come up with is as below:
import os
import shutil
file_path = "/dbfs/path/to/csv/file/File.csv"
temp_file_path = "/tmp/temp_file/"
file_string_or_dataframe.coalesce(1).write.format("com.databricks.spark.
csv").option("header", True).mode("overwrite").save(temp_file_path)
csv_file = [file for file in os.listdir('/dbfs/' + temp_file_path) if
file.endswith('.csv')][0]
new_csv_file_path = os.path.join('/dbfs/', os.path.dirname(file_path),
csv_file)
os.rename('/dbfs' + os.path.join(temp_file_path, csv_file),
new_csv_file_path)
shutil.move(new_csv_file_path, file_path)
shutil.rmtree('/dbfs/' + temp_file_path)
print("Output written to CSV file: " + file_path)
This still writes the csv to a file that begins with "part" and it inserts it into a folder that has the desired file name. How can I rename the part file and not have write to a folder?
If I change new_csv_file_path
to be the desired file name, I end up with an error saying that it is a directory.
Any way around this would be appreciated! Thanks.
When writing a pyspark dataframe to a file, it will always write to a part file by default. This is because of partitions, even if there is only 1 partitions.
There are 2 options. If your file is small and can fit on the driver node memory, you can convert the pyspark dataframe to a pandas dataframe and then write to csv like so.
df.toPandas().to_csv(path)
OR
You can coalesce to 1 partition(like you have done), and then copy and rename that 1 partition file to your target file path like so. And then clean up the original path file directory.
import os
import shutil
file_path = "/dbfs/test1/File.csv"
temp_file_path = "/tmp/temp_file/"
dataframe.coalesce(1).write.format("com.databricks.spark.csv").option("header", True).mode("overwrite").save(temp_file_path)
csv_file = [file for file in os.listdir('/dbfs/' + temp_file_path) if
file.endswith('.csv')][0]
new_csv_file_path = os.path.join('/dbfs/', os.path.dirname(file_path),
csv_file)
#print(f"new_csv_file_path {new_csv_file_path}")
#os.rename('/dbfs' + os.path.join(temp_file_path, csv_file),
#new_csv_file_path)
old_path = temp_file_path + csv_file
print(f"old_path : {old_path}")
dbutils.fs.cp(temp_file_path + csv_file, file_path)
shutil.rmtree('/dbfs/' + temp_file_path)
print("Output written to CSV file: " + file_path)