Search code examples
apache-sparkpysparkdatabrickscluster-computinghdf

Working with hdf files in Databricks cluster


I am trying to create a simple .hdf in the Databricks environment. I can create the file on the driver, but the same code when executed with rdd.map(), it throws following exception.

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 287.0 failed 4 times, most recent failure: Lost task 3.3 in stage 287.0 (TID 1080) (10.67.238.26 executor 1): org.apache.spark.api.python.PythonException: **'RuntimeError: Can't decrement id ref count (file write failed:** time = Tue Nov  1 11:38:44 2022
, filename = '/dbfs/mnt/demo.hdf', file descriptor = 7, errno = 95, error message = '**Operation not supported**', buf = 0x30ab998, total write size = 40, bytes this sub-write = 40, bytes actually written = 18446744073709551615, offset = 0)', from <command-1134257524229717>, line 13. Full traceback below:

I can write the same file on the worker and copy the file back to /dbfs/mnt. However, I was looking for a way through which I can write/modify the .hdf files stored in dbfs/mnt locaction through worker nodes directly.

def create_hdf_file_tmp(x):
    import numpy as np
    import h5py, os, subprocess
    import pandas as pd
 
    dummy_data = [1,2,3,4,5]
    df_data = pd.DataFrame(dummy_data, columns=['Numbers'])

    with h5py.File('/dbfs/mnt/demo.hdf', 'w') as f:
        dset = f.create_dataset('default', data = df_data) # write to .hdf file
    
    return True

def read_hdf_file(file_name, dname):
    import numpy as np
    import h5py, os, subprocess
    import pandas as pd

    with h5py.File(file_name, 'r') as f:
        data = f[dname]
        print(data[:5])    
    return data


#driver code
rdd = spark.sparkContext.parallelize(['/dbfs/mnt/demo.hdf'])
result = rdd.map(lambda x: create_hdf_file_tmp(x)).collect()

Above is the minimal code that I am trying to run in the Databricsk notebook with 1 driver and 2 worker nodes.


Solution

  • From error message: Operation not supported, most probably, when writing HDF file, the API uses something like random writes that aren't supported by DBFS (see DBFS Local API limitations in the docs). You will need to write a file to a local disk and then move that file to DBFS mount. But it will work only on the driver node...