Search code examples
pysparkazure-databricks

How to read a list of Path names as a pyspark dataframe


How do I read a list of paths as a pyspark dataframe (azure databricks)?

Sample :-

file_paths=[FileInfo(path='dbfs:/mnt/OUTPUT/path1.csv', name='path1.csv',size=520283136),
 FileInfo(path='dbfs:/mnt/OUTPUT/path2.csv',name='path2.csv', size=565211023),
 FileInfo(path='dbfs:/mnt/OUTPUT/path3.csv'name='path3.csv', size=545199423),
 FileInfo(path='dbfs:/mnt/OUTPUT/path4.csv',name='path4.csv' size=511559759),
 FileInfo(path='dbfs:/mnt/OUTPUT/path5.csv', name='path5.csv', size=532738818),
 FileInfo(path='dbfs:/mnt/OUTPUT/path6.csv', name='path6.csv', size=521915460)]

Solution

  • If you want to iterate on this list you can simply do a for loop :

    dfs = {}
    for file_info in file_paths:
      print(f'loading {file_info.name} from {file_info.path=}')
      dfs[file_info.name.replace('.csv','')] = spark.read.option('header',True).csv(file_info.path)
      
    dfs['path_1'].select('seq','name','zip','date').show()
    
    > loading path_1.csv from file_info.path='dbfs:/mnt/OUTPUT/path_1.csv'
    > loading path_2.csv from file_info.path='dbfs:/mnt/OUTPUT/path_2.csv'
    > loading path_3.csv from file_info.path='dbfs:/mnt/OUTPUT/path_3.csv'
    > loading path_4.csv from file_info.path='dbfs:/mnt/OUTPUT/path_4.csv'
    > loading path_5.csv from file_info.path='dbfs:/mnt/OUTPUT/path_5.csv'
    +---+----------+-----+----------+
    |seq|      name|  zip|      date|
    +---+----------+-----+----------+
    |  1|   Rebecca|13336|11/15/2055|
    |  2|    Maggie|18310|04/21/1932|
    |  3|    Ronnie|93307|06/03/1950|
    |  4|     Lydia|88951|01/22/1945|
    |  5|      Lida|30880|08/13/2002|
    +---+----------+-----+----------+
    

    If all CSVs files have the same schema, but you want to apply different rules, you could load them all in a single dataframe with a "filename" column as follows :

    from  pyspark.sql.functions import input_file_name ,split, element_at
    df = spark.read.option('header',True).csv("dbfs:/mnt/OUTPUT/*/*.csv").withColumn("filename",element_at(split(input_file_name(),'/'),-1))
    df.show()
     
    
    ---+----------+-----+----------+------------+
    |seq|     name|  zip|      date|    filename|
    +---+---------+-----+----------+------------+
    |  1|  Rebecca|13336|11/15/2055|  path_1.csv|
    |  2|   Maggie|18310|04/21/1932|  path_2.csv|
    |  3|   Ronnie|93307|06/03/1950|  path_3.csv|
    |  4|    Lydia|88951|01/22/1945|  path_4.csv|
    |  5|     Lida|30880|08/13/2002|  path_5.csv|
    +---+---------+-----+----------+------------+
    

    You can then use the .where clause to apply transformations on rows by filename.

    There is also the option recursiveFileLookup that I have found slightly more performant when dealing with lots of subdirectories.

    df = (spark.read`
            .option("recursiveFileLookup",True)
            .csv("dbfs:/mnt/OUTPUT/")         
           )