I have notebook using PySpark. A section that I am struggling with is to check if a directory in a container exist on Azure Data Lake Storage Gen2.
If the directory exist I can read the underlying data files into dfStaging and also check if the dfStaging contains any data. In this case I want to merge this dfStaging with another dfLanding into dfMerge.
Else I just work with dfLanding
However if the directory does not yet exist I need to know so I handle this if statement correctly
for row in collectPartition:
dfLanding = df.filter(col("_partition") == row["_partition"])
dfStaging = spark.read.load(row["_location"], format='parquet', schema=dataSchema) #ERROR: IF the location does not yet exist an error occurs
dfLanding.printSchema()
dfStaging.printSchema()
if dfStaging.count() == 0:
#first time - create new dataset
dfLanding.write.mode('overwrite').parquet(row["_location"])
else:
#second time - merge dataset
dfLanding = dfLanding.withColumn('_source',lit(1))
dfStaging = dfStaging.withColumn('_source',lit(2))
dfMerge = dfPartition.union(dfStaging)
partition = Window.partitionBy("RECID").orderBy(col("_source"))
dfMerge = dfMerge.withColumn("_version",row_number().over(partition))
dfMerge = dfMerge.filter(col("_version") == 1)
dfMerge = dfMerge.drop("_source")
dfMerge = dfMerge.drop("_version")
dfMerge.write.mode('overwrite').parquet(row["_location"])
You try below code for folder checking and based on that execute your code.
for row in collectPartition:
stg_dir = row["_location"].replace(row["_partition"],"")
dirs = [i.path for i in mssparkutils.fs.ls(stg_dir)]
if row["_location"] in dirs:
print(row["_location"])
dfStaging = spark.read.load(row["_location"], format='parquet', schema=dataSchema)
if dfStaging.count() == 0:
#first time create dataset code
else:
#merge data code
else:
#handle if dir not exists.
mssparkutils.fs.mkdirs(row["_location"])
print(row["_location"]+" created")
To handle when folder does not exist is to create folder and write it.
File in container.
Output:
So, you can use these if-else
blocks in your merging and creating dataset code.