How do I read a list of paths as a pyspark dataframe (azure databricks)?
Sample :-
file_paths=[FileInfo(path='dbfs:/mnt/OUTPUT/path1.csv', name='path1.csv',size=520283136),
FileInfo(path='dbfs:/mnt/OUTPUT/path2.csv',name='path2.csv', size=565211023),
FileInfo(path='dbfs:/mnt/OUTPUT/path3.csv'name='path3.csv', size=545199423),
FileInfo(path='dbfs:/mnt/OUTPUT/path4.csv',name='path4.csv' size=511559759),
FileInfo(path='dbfs:/mnt/OUTPUT/path5.csv', name='path5.csv', size=532738818),
FileInfo(path='dbfs:/mnt/OUTPUT/path6.csv', name='path6.csv', size=521915460)]
If you want to iterate on this list you can simply do a for
loop
:
dfs = {}
for file_info in file_paths:
print(f'loading {file_info.name} from {file_info.path=}')
dfs[file_info.name.replace('.csv','')] = spark.read.option('header',True).csv(file_info.path)
dfs['path_1'].select('seq','name','zip','date').show()
> loading path_1.csv from file_info.path='dbfs:/mnt/OUTPUT/path_1.csv'
> loading path_2.csv from file_info.path='dbfs:/mnt/OUTPUT/path_2.csv'
> loading path_3.csv from file_info.path='dbfs:/mnt/OUTPUT/path_3.csv'
> loading path_4.csv from file_info.path='dbfs:/mnt/OUTPUT/path_4.csv'
> loading path_5.csv from file_info.path='dbfs:/mnt/OUTPUT/path_5.csv'
+---+----------+-----+----------+
|seq| name| zip| date|
+---+----------+-----+----------+
| 1| Rebecca|13336|11/15/2055|
| 2| Maggie|18310|04/21/1932|
| 3| Ronnie|93307|06/03/1950|
| 4| Lydia|88951|01/22/1945|
| 5| Lida|30880|08/13/2002|
+---+----------+-----+----------+
If all CSVs files have the same schema, but you want to apply different rules, you could load them all in a single dataframe with a "filename" column as follows :
from pyspark.sql.functions import input_file_name ,split, element_at
df = spark.read.option('header',True).csv("dbfs:/mnt/OUTPUT/*/*.csv").withColumn("filename",element_at(split(input_file_name(),'/'),-1))
df.show()
---+----------+-----+----------+------------+
|seq| name| zip| date| filename|
+---+---------+-----+----------+------------+
| 1| Rebecca|13336|11/15/2055| path_1.csv|
| 2| Maggie|18310|04/21/1932| path_2.csv|
| 3| Ronnie|93307|06/03/1950| path_3.csv|
| 4| Lydia|88951|01/22/1945| path_4.csv|
| 5| Lida|30880|08/13/2002| path_5.csv|
+---+---------+-----+----------+------------+
You can then use the .where
clause to apply transformations on rows by filename.
There is also the option recursiveFileLookup
that I have found slightly more performant when dealing with lots of subdirectories.
df = (spark.read`
.option("recursiveFileLookup",True)
.csv("dbfs:/mnt/OUTPUT/")
)