Search code examples
pythoncsvapache-sparkpysparkapache-spark-sql

PySpark: How to read multiple CSV files with different column positions most efficiently


I am trying to read multiple CSV files in a directory using Spark in the most efficient way possible. Unfortunately, I haven't found a better way to do this other than reading each file individually, which is quite time consuming.

From what I understood, the most efficient way to read multiple CSV files is to use the * as follows:

df = spark.read.format('csv') \
        .option('header', 'true') \
        .load('/path/to/csv/folder/*.csv')

However, although it is very fast, it does not perform the union by column name, but follows the column indices. If, for instance, the directory contains the following two CSV files:

1.csv:

A B C
1 2 5
3 4 6

2.csv:

A C
7 8

the previous operation will merge them like this:

df:

A B C
1 2 5
3 4 6
7 8 NULL

This is obviously incorrect since the last row should read 7|NULL|8.

Well, I was able to solve this problem by reading each file individually and then doing a unionByName with the allowMissingColumns parameter set to True, as follows:

dfs = []
for filename in list_file_names('/path/to/csv/folder'):
    dfs.append(spark.read.format('csv') \
        .option('header', 'true') \
        .load('/path/to/csv/folder/{filename}')
    )
union_df = dfs[0]
for df in dfs[1:]:
    union_df = union_df.unionByName(df, allowMissingColumns=True)

This works as expected, but as I read each file individually, it is much slower. For 100 small CSV files in hdfs on the same machine, the first (but wrong) method finishes in around 6 seconds, while the second takes 16 seconds.

So my question is, can I achieve the same result in PySpark by performing only one read operation as in the first method?


Solution

  • can I achieve the same result in PySpark by performing only one read operation

    Sadly you can't use the spark data source api in one go, as you noticed due to schema merging limitations.

    Instead you could optimize your union approach by first reading the header of each file, grouping them by category of csv and then unioning each category of file.

    Getting all first row associated with the file path can be achieved in pure Python with boto for example.

    Then reading a list of files in one go can be done with the comma separated list of path.

    While in two steps, if you have few category of csv, this should be much faster than unioning every single files.