I have this pyspark script in Azure databricks notebook:
import argparse
from pyspark.sql.types import StructType
from pyspark.sql.types import StringType
spark.conf.set(
"fs.azure.account.key.gcdmchndev01c.dfs.core.chinacloudapi.cn",
"<storag account key>"
)
inputfile = "abfss://raw@gcdmchndev01c.dfs.core.chinacloudapi.cn/test/CODI_Ignored_Hospital_Code.csv"
outputpath = "abfss://spark-processed@gcdmchndev01c.dfs.core.chinacloudapi.cn/test/CODI_Ignored_Hospital_Code"
jsonTableSchema = "{'type': 'struct', 'fields': [{'name': 'id', 'type': 'integer', 'nullable': False, 'metadata': {} }, {'name': 'hospital_ddi_code', 'type': 'string', 'nullable': True, 'metadata': {} } ]}"
pipelineId = "test111111"
rawpath = "raw/test/CODI_Ignored_Hospital_Code"
rawfilename = "CODI_Ignored_Hospital_Code.csv"
outputpath_bad = "abfss://spark-processed@gcdmchndev01c.dfs.core.chinacloudapi.cn/bad/test/CODI_Ignored_Hospital_Code"
stSchema = StructType.fromJson(eval(jsonTableSchema))
stSchema.add("ValidateErrorMessage", StringType(),True)
raw = spark.read.csv(inputfile, schema=stSchema, header=True, columnNameOfCorruptRecord='ValidateErrorMessage', mode='PERMISSIVE')
raw.createOrReplaceTempView("tv_raw")
dfClean = spark.sql("""
select
*, nvl2(ValidateErrorMessage, 'failed', 'success') as ValidateStatus, now()+ INTERVAL 8 HOURS as ProcessDate, '""" + pipelineId + "'as ProcessId, '" + rawpath + "' as RawPath, '" + rawfilename + "' as RawFileName" + """
from
tv_raw
""")
#dfClean.printSchema()
dfBadRecord = dfClean.filter(dfClean["ValidateErrorMessage"].isNotNull())
#dfBadRecord.cache()
badCount = dfBadRecord.count()
And this gives me the error on the last line:
badCount = dfBadRecord.count()
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 21.0 failed 4 times, most recent failure: Lost task 0.3 in stage 21.0 (TID 39, 10.139.64.6, executor 1): com.databricks.sql.io.FileReadException: Error while reading file abfss:REDACTED_LOCAL_PART@gcdmchndev01c.dfs.core.chinacloudapi.cn/test/CODI_Ignored_Hospital_Code.csv
And detailed error:
Py4JJavaError Traceback (most recent call last)
<command-3450774645335260> in <module>
34 dfBadRecord = dfClean.filter(dfClean["ValidateErrorMessage"].isNotNull())
35 #dfBadRecord.cache()
---> 36 badCount = dfBadRecord.count()
37
/databricks/spark/python/pyspark/sql/dataframe.py in count(self)
584 2
585 """
--> 586 return int(self._jdf.count())
587
588 @ignore_unicode_prefix
/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
1303 answer = self.gateway_client.send_command(command)
1304 return_value = get_return_value(
-> 1305 answer, self.gateway_client, self.target_id, self.name)
1306
1307 for temp_arg in temp_args:
I contacted MS Azure support and they pointed to add .cache() at the end, so use this
raw = spark.read.csv(inputfile, schema=stSchema, header=True, columnNameOfCorruptRecord='ValidateErrorMessage', mode='PERMISSIVE').cache()
instead of:
raw = spark.read.csv(inputfile, schema=stSchema, header=True, columnNameOfCorruptRecord='ValidateErrorMessage', mode='PERMISSIVE')