Search code examples
databricksazure-databricks

Azure databricks data frame count generates error com.databricks.sql.io.FileReadException: Error while reading file abfss:REDACTED_LOCAL_PART


I have this pyspark script in Azure databricks notebook:

    import argparse
    from pyspark.sql.types import StructType
    from pyspark.sql.types import StringType

    spark.conf.set(
    "fs.azure.account.key.gcdmchndev01c.dfs.core.chinacloudapi.cn",
    "<storag account key>"
    )
        

    inputfile = "abfss://raw@gcdmchndev01c.dfs.core.chinacloudapi.cn/test/CODI_Ignored_Hospital_Code.csv"
    outputpath = "abfss://spark-processed@gcdmchndev01c.dfs.core.chinacloudapi.cn/test/CODI_Ignored_Hospital_Code"
    jsonTableSchema = "{'type': 'struct', 'fields': [{'name': 'id', 'type': 'integer', 'nullable': False, 'metadata': {} }, {'name': 'hospital_ddi_code', 'type': 'string', 'nullable': True, 'metadata': {} } ]}"
    pipelineId = "test111111"
    rawpath = "raw/test/CODI_Ignored_Hospital_Code"
    rawfilename = "CODI_Ignored_Hospital_Code.csv"
    outputpath_bad = "abfss://spark-processed@gcdmchndev01c.dfs.core.chinacloudapi.cn/bad/test/CODI_Ignored_Hospital_Code"

    stSchema = StructType.fromJson(eval(jsonTableSchema))

    stSchema.add("ValidateErrorMessage", StringType(),True)

    raw = spark.read.csv(inputfile, schema=stSchema, header=True, columnNameOfCorruptRecord='ValidateErrorMessage', mode='PERMISSIVE')

    raw.createOrReplaceTempView("tv_raw")
    dfClean = spark.sql("""
    select 
        *, nvl2(ValidateErrorMessage, 'failed', 'success') as ValidateStatus, now()+ INTERVAL 8 HOURS as ProcessDate, '""" + pipelineId + "'as ProcessId, '" + rawpath + "' as RawPath, '" + rawfilename + "' as RawFileName" + """
    from 
        tv_raw
    """)
    #dfClean.printSchema()

    dfBadRecord = dfClean.filter(dfClean["ValidateErrorMessage"].isNotNull())
    #dfBadRecord.cache()
    badCount = dfBadRecord.count()

And this gives me the error on the last line:

badCount = dfBadRecord.count()


    org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 21.0 failed 4 times, most recent failure: Lost task 0.3 in stage 21.0 (TID 39, 10.139.64.6, executor 1): com.databricks.sql.io.FileReadException: Error while reading file abfss:REDACTED_LOCAL_PART@gcdmchndev01c.dfs.core.chinacloudapi.cn/test/CODI_Ignored_Hospital_Code.csv

And detailed error:

    Py4JJavaError                             Traceback (most recent call last)
    <command-3450774645335260> in <module>
         34 dfBadRecord = dfClean.filter(dfClean["ValidateErrorMessage"].isNotNull())
         35 #dfBadRecord.cache()
    ---> 36 badCount = dfBadRecord.count()
         37 

    /databricks/spark/python/pyspark/sql/dataframe.py in count(self)
        584         2
        585         """
    --> 586         return int(self._jdf.count())
        587 
        588     @ignore_unicode_prefix

    /databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
       1303         answer = self.gateway_client.send_command(command)
       1304         return_value = get_return_value(
    -> 1305             answer, self.gateway_client, self.target_id, self.name)
       1306 
       1307         for temp_arg in temp_args:

Solution

  • I contacted MS Azure support and they pointed to add .cache() at the end, so use this

        raw = spark.read.csv(inputfile, schema=stSchema, header=True, columnNameOfCorruptRecord='ValidateErrorMessage', mode='PERMISSIVE').cache()
    

    instead of:

        raw = spark.read.csv(inputfile, schema=stSchema, header=True, columnNameOfCorruptRecord='ValidateErrorMessage', mode='PERMISSIVE')