--- edit --- This happens irrespective of whether the dataframe is empty or not. --- end ---
I'm writing (df.repartition(1).write.save()
) an empty Dataframe to driver local hard disk. I use header=True
, so expected output is a csv file with just the header row.
When I run it on a single node jobs cluster (Databricks), it produces a single csv file with just the header. But same code produces no csv file when I run it on a multi-node jobs cluster (2 workers, 1 driver), although it seems to write something.
Any thoughts?
Code:
import shutil, tempfile
from pathlib import Path
from pyspark.sql import SparkSession, DataFrame
def write_as_one_file(spark_session: SparkSession, df: DataFrame, out_dir: str,
file_format: str = 'csv', compression: str = 'gzip') -> str:
"""
@return: full path to the output file
"""
# When running on Databricks cluster, we need to prefix path with "file:" for it to work.
prefix = '' if (not spark_session or spark_session.conf.get('spark.app.name') != 'Databricks Shell') else 'file:'
save_options = {'format': file_format, 'header': True}
if compression:
save_options['compression'] = compression
print(f'prefix: {prefix}, save_options: {save_options}')
with tempfile.TemporaryDirectory() as tmp_dir_name:
tmp_df_dir = f'{prefix}{tmp_dir_name}/df'
print(f'tmp_df_dir: {tmp_df_dir}')
df.repartition(1).write.save(path=tmp_df_dir, **save_options)
files = list(Path(tmp_dir_name).rglob(f'part*.{file_format}*'))
print(f'listing -- tmp_df_dir: {list(Path(tmp_dir_name).rglob("*"))}')
if len(files) != 1:
raise Exception(f'expected exactly only one file to match pattern, found "{files}"')
os.makedirs(out_dir, exist_ok=True)
out_file = f'{out_dir}/{os.path.basename(files[0])}'
shutil.copy(files[0], out_file)
return out_file
df1 = spark.createDataFrame(data=[], schema='id:int')
write_as_one_file(spark, df1, '/home/kash/output_dir/')
When run on a single node cluster, it produces one csv.gz
file.
prefix: file:, save_options: {'format': 'csv', 'header': True, 'compression': 'gzip'}
tmp_df_dir: file:/tmp/tmp3xl8rvi7/df
listing -- tmp_df_dir: [
PosixPath('/tmp/tmp3xl8rvi7/df'),
PosixPath('/tmp/tmp3xl8rvi7/df/_SUCCESS'),
PosixPath('/tmp/tmp3xl8rvi7/df/.part-00000-tid-1201247493872524145-a40b5e0b-2dde-4edc-98ec-c40cbbf6a29d-1-1-c000.csv.gz.crc'),
PosixPath('/tmp/tmp3xl8rvi7/df/part-00000-tid-1201247493872524145-a40b5e0b-2dde-4edc-98ec-c40cbbf6a29d-1-1-c000.csv.gz'),
PosixPath('/tmp/tmp3xl8rvi7/df/_committed_1201247493872524145'),
PosixPath('/tmp/tmp3xl8rvi7/df/._SUCCESS.crc'),
PosixPath('/tmp/tmp3xl8rvi7/df/._committed_1201247493872524145.crc'),
PosixPath('/tmp/tmp3xl8rvi7/df/_started_1201247493872524145'),
PosixPath('/tmp/tmp3xl8rvi7/df/._started_1201247493872524145.crc')
]
When run on a multi node cluster, it doesn't produce any csv.gz
file, although it does "write" the Dataframe.
prefix: file:, save_options: {'format': 'csv', 'header': True, 'compression': 'gzip'}
tmp_df_dir: file:/tmp/tmpj2ki4hpl/df
listing -- tmp_df_dir: [
PosixPath('/tmp/tmpj2ki4hpl/df'),
PosixPath('/tmp/tmpj2ki4hpl/df/._committed_637987277244855410.crc'),
PosixPath('/tmp/tmpj2ki4hpl/df/_SUCCESS'),
PosixPath('/tmp/tmpj2ki4hpl/df/_committed_637987277244855410'),
PosixPath('/tmp/tmpj2ki4hpl/df/._SUCCESS.crc')
]
# and throws the Exception here...
Using Databricks Runtime 13.3LTS
(Spark 3.4.1
). With or without Photon makes no difference.
Node type single node cluster = r6id.xlarge
, multi node cluster = i3.xlarge
.
The tempfile.TemporaryDirectory()
is referring to a temp directory that is only created & available on the driver node, not the workers.
In a distributed spark cluster, you would need to use a distributed file system as well, such as DBFS, EFS, S3, or Azure Data Lake Storage.
with tempfile.TemporaryDirectory(dir='/dbfs') as tmp_dir_name:
tmp_df_dir = f'{tmp_dir_name}/df'
print(f'tmp_df_dir: {tmp_df_dir}')
df.repartition(1).write.save(
path=tmp_df_dir.replace('/dbfs', '/dbfs'),
**save_options
)