Search code examples
apache-sparkpysparkdatabricks

Dataframe.write() produces csv file on single node jobs cluster, but not on 2+1 nodes cluster


--- edit --- This happens irrespective of whether the dataframe is empty or not. --- end ---

I'm writing (df.repartition(1).write.save()) an empty Dataframe to driver local hard disk. I use header=True, so expected output is a csv file with just the header row.

When I run it on a single node jobs cluster (Databricks), it produces a single csv file with just the header. But same code produces no csv file when I run it on a multi-node jobs cluster (2 workers, 1 driver), although it seems to write something.

Any thoughts?


Code:

import shutil, tempfile
from pathlib import Path
from pyspark.sql import SparkSession, DataFrame

def write_as_one_file(spark_session: SparkSession, df: DataFrame, out_dir: str,
                             file_format: str = 'csv', compression: str = 'gzip') -> str:
    """
    @return: full path to the output file
    """
    # When running on Databricks cluster, we need to prefix path with "file:" for it to work.
    prefix = '' if (not spark_session or spark_session.conf.get('spark.app.name') != 'Databricks Shell') else 'file:'
    save_options = {'format': file_format, 'header': True}
    if compression:
        save_options['compression'] = compression

    print(f'prefix: {prefix}, save_options: {save_options}')
    with tempfile.TemporaryDirectory() as tmp_dir_name:
        tmp_df_dir = f'{prefix}{tmp_dir_name}/df'
        print(f'tmp_df_dir: {tmp_df_dir}')
        df.repartition(1).write.save(path=tmp_df_dir, **save_options)
        files = list(Path(tmp_dir_name).rglob(f'part*.{file_format}*'))
        print(f'listing -- tmp_df_dir: {list(Path(tmp_dir_name).rglob("*"))}')
        if len(files) != 1:
            raise Exception(f'expected exactly only one file to match pattern, found "{files}"')

        os.makedirs(out_dir, exist_ok=True)
        out_file = f'{out_dir}/{os.path.basename(files[0])}'
        shutil.copy(files[0], out_file)
        return out_file

df1 = spark.createDataFrame(data=[], schema='id:int')
write_as_one_file(spark, df1, '/home/kash/output_dir/')

When run on a single node cluster, it produces one csv.gz file.

prefix: file:, save_options: {'format': 'csv', 'header': True, 'compression': 'gzip'}
tmp_df_dir: file:/tmp/tmp3xl8rvi7/df
listing -- tmp_df_dir: [
    PosixPath('/tmp/tmp3xl8rvi7/df'), 
    PosixPath('/tmp/tmp3xl8rvi7/df/_SUCCESS'), 
    PosixPath('/tmp/tmp3xl8rvi7/df/.part-00000-tid-1201247493872524145-a40b5e0b-2dde-4edc-98ec-c40cbbf6a29d-1-1-c000.csv.gz.crc'), 
    PosixPath('/tmp/tmp3xl8rvi7/df/part-00000-tid-1201247493872524145-a40b5e0b-2dde-4edc-98ec-c40cbbf6a29d-1-1-c000.csv.gz'), 
    PosixPath('/tmp/tmp3xl8rvi7/df/_committed_1201247493872524145'), 
    PosixPath('/tmp/tmp3xl8rvi7/df/._SUCCESS.crc'), 
    PosixPath('/tmp/tmp3xl8rvi7/df/._committed_1201247493872524145.crc'), 
    PosixPath('/tmp/tmp3xl8rvi7/df/_started_1201247493872524145'), 
    PosixPath('/tmp/tmp3xl8rvi7/df/._started_1201247493872524145.crc')
]

When run on a multi node cluster, it doesn't produce any csv.gz file, although it does "write" the Dataframe.

prefix: file:, save_options: {'format': 'csv', 'header': True, 'compression': 'gzip'}
tmp_df_dir: file:/tmp/tmpj2ki4hpl/df
listing -- tmp_df_dir: [
    PosixPath('/tmp/tmpj2ki4hpl/df'), 
    PosixPath('/tmp/tmpj2ki4hpl/df/._committed_637987277244855410.crc'), 
    PosixPath('/tmp/tmpj2ki4hpl/df/_SUCCESS'), 
    PosixPath('/tmp/tmpj2ki4hpl/df/_committed_637987277244855410'), 
    PosixPath('/tmp/tmpj2ki4hpl/df/._SUCCESS.crc')
]

# and throws the Exception here...

Using Databricks Runtime 13.3LTS (Spark 3.4.1). With or without Photon makes no difference.

Node type single node cluster = r6id.xlarge, multi node cluster = i3.xlarge.


Solution

  • The tempfile.TemporaryDirectory() is referring to a temp directory that is only created & available on the driver node, not the workers.

    In a distributed spark cluster, you would need to use a distributed file system as well, such as DBFS, EFS, S3, or Azure Data Lake Storage.


        with tempfile.TemporaryDirectory(dir='/dbfs') as tmp_dir_name:
            tmp_df_dir = f'{tmp_dir_name}/df'
            print(f'tmp_df_dir: {tmp_df_dir}')
            df.repartition(1).write.save(
                    path=tmp_df_dir.replace('/dbfs', '/dbfs'),
                    **save_options
                )