Search code examples
pysparkazure-blob-storageazure-data-factory

ADF PySpark Notebook Check if Directory Exist on Azure Storage Account


I have notebook using PySpark. A section that I am struggling with is to check if a directory in a container exist on Azure Data Lake Storage Gen2.

enter image description here

If the directory exist I can read the underlying data files into dfStaging and also check if the dfStaging contains any data. In this case I want to merge this dfStaging with another dfLanding into dfMerge.

Else I just work with dfLanding

However if the directory does not yet exist I need to know so I handle this if statement correctly

for row in collectPartition: 

dfLanding = df.filter(col("_partition") == row["_partition"])

dfStaging = spark.read.load(row["_location"], format='parquet', schema=dataSchema) #ERROR: IF the location does not yet exist an error occurs

dfLanding.printSchema()
dfStaging.printSchema()

if dfStaging.count() == 0:   
    #first time - create new dataset
    dfLanding.write.mode('overwrite').parquet(row["_location"])

else:
     #second time - merge dataset
    dfLanding = dfLanding.withColumn('_source',lit(1))
    dfStaging = dfStaging.withColumn('_source',lit(2))
    
    dfMerge = dfPartition.union(dfStaging)
    partition = Window.partitionBy("RECID").orderBy(col("_source"))    
    dfMerge = dfMerge.withColumn("_version",row_number().over(partition))

    dfMerge = dfMerge.filter(col("_version") == 1)
    dfMerge = dfMerge.drop("_source")
    dfMerge = dfMerge.drop("_version")
    dfMerge.write.mode('overwrite').parquet(row["_location"])

Solution

  • You try below code for folder checking and based on that execute your code.

    for row in collectPartition:
        stg_dir = row["_location"].replace(row["_partition"],"")
        dirs = [i.path for i in mssparkutils.fs.ls(stg_dir)]
        if row["_location"] in dirs:
            print(row["_location"])
            dfStaging = spark.read.load(row["_location"], format='parquet', schema=dataSchema)
            if dfStaging.count() == 0:
                #first time create dataset code
            else:
                #merge data code
        else:
            #handle if dir not exists.
            mssparkutils.fs.mkdirs(row["_location"])
            print(row["_location"]+" created")
            
    

    To handle when folder does not exist is to create folder and write it.

    File in container.

    enter image description here

    Output:

    enter image description here

    So, you can use these if-else blocks in your merging and creating dataset code.