Search code examples
pysparkazure-databricksazure-synapseazure-data-lake-gen2

How to Read Multiple Files from a Folder in ADLS Gen2 using Pyspark in Azure Synapse and use for Processing?


I am Looking for an solution where i want to read all the files from a folder Name as **'COUNTRIES DETAIL' containing another folder 'YEAR' which is there in a Container in ADLS GEN2 name 'DETAILS'. . Currently i have 200 files in my sub folder 'YEAR'. the format of files are 'YYYY_DETAILS_GERMANY_GERMAN_.csv' , 'YYYY_DETAILS_INDIA_GOOD_.csv', 'YYYY_DETAILS_ENGLAND_PRODUCTS_.csv'.

I am looking for a approach through which i can read all the files from the Folder and use that file only which is required for transformations. The Scenario i want to create is once i read the files from the folder, i want to pick only that file which is required for operations.

HOW WE SHOULD PICK THE FILE OR WHICH FILE TO PICK??

I am reading an file already from another folder and stored that data in dataframe df . The dataframe have a column called 'COUNTRY_NAME'. There will be common value in 'COUNTRY_NAME' column in the dataframe. Suppose we have value as 'INDIAGOOD' in COUNTRY_NAME Column so i need to pick 'YYYY_DETAILS_INDIA_GOOD_.csv' file data and load into my another Dataframe . simlilarly for other cases i need to implement this same scenario. if the value in COUNTRY_NAME column matches with the filename present in adls i want to load that file data into dataframe and do operations. There will always be common value in COUNTRY_NAME Column which have some matching Text with the file that is there in 'YEAR FOLDER'

SAMPLE DATAFRAME 1

+-----------+
COUNTRY_NAME|
+-----------+
INDIAGOOD    
INDIAGOOD
INDIAGOOD
INDIAGOOD

FILENAMES IN ADLS

 +---------------------------------+
 FILE_NAMES                        |
 +---------------------------------+
 YYYY_DETAILS_INDIA_GOOD_.csv      |
 YYYY_DETAILS_ENGLAND_PRODUCTS_.csv|
 YYYY_DETAILS_GERMANY_GERMAN_.csv  |

As we have value 'INDIAGOOD' in my column value 'COUNTRY_NAME' i will pick file YYYY_DETAILS_INDIA_GOOD_.csv from the adls because its substring matching with filename in adls and load it into my dataframe df2


Solution

    • I have the following files in my ADLS folder.

    enter image description here

    • And I have a Dataframe called files_df with the following values:

    enter image description here

    • Now the requirement is to check if the filename exists in the above dataframe column value. If it is present, then load the corresponding file's data (from ADLS) to a dataframe called final_df.
    • To do this I have used the following code. First, I have used dbutils.fs.ls() to get list of contents in the year folder:
    files_in_adls = dbutils.fs.ls("abfss://[email protected]/Countries detail/year")
    print(files_in_adls)
    

    enter image description here

    • Then I have used the following code with loop to check if the file actually exists in the column and then load its data to final_df using union.
    #initialising the final dataframe
    final_df = None   
    
    #looping through files
    for file in files_in_adls:
        req = ''.join(file.name.split('_')[2:-1])
        #print(req) 
        #gives us required filenames like if filename if YYYY_DETAILS_INDIA_GOOD_.csv then the value of req in that case would be INDIAGOOD
    
    
        if(files_df.filter(files_df.filenames.contains(req.upper())).count()>0):  #checking if this extracted name is present in files_df column values
    
            #reading data
            if(final_df is not None):  
                df = spark.read.option("header",True).format('csv').load(file.path)
                final_df = final_df.union(df)
            else:
                final_df = spark.read.option("header",True).format('csv').load(file.path)
        else:
            print(f"{req} is not present in ADLS")
            
    #display(final_df)
    

    enter image description here

    Note: Here, my file data has only one row like this is from <file name> file. Since there are 2 matching files, both of their data is loaded into same dataframe(final_df).