Search code examples
apache-sparkpysparkazure-databricks

Faster approach to spark wholeTextFiles for multiple unformatted files. PySpark


I'm using spark to read multiple little files. Each file is a client specific format and contains multiple tables (each with a different structure). We have created a python parser which works and handles partitioning given the path. Let me explain via schema:

folder
|- file_number=0001
   |- year=2019
      |- month=10
         |- day=21
            |- hour=17
               |- file.txt
|- file_number=0002
   |- year=2019
      |- month=10
         |- day=21
            |- hour=17
               |- file.txt
etc
.
.
.

So the naive approach is:

sc.wholeTextFiles('/path/to/file_number=*/year=*/month=*/day=*/hour=*/*.txt')\ # This is a pair (path, file Content)
  .flatMap(lambda x: parser(x[1], x[0]))\ # This is the parser function. Is plain python and works fast. We use the path to pick up the partitioning. The parser returns a list of tuples that's why flatMap
  .foldByKey([], lambda x, y: x + y)      # The key is the table name and the value is the data as a list of dicts in a tabular-like style

The transformation .wholeTextFiles('/path/to/file_number=*/year=*/month=*/day=*/hour=*/*.txt') takes an insane amount of time, considerig than the rest takes not so much.

Up to this blog, the problem might be some recursive call, so it is much better to list all files first and then read each file. I haven't be able to use Hadoop's FileSystem.open(path) as propose in the link because I am working on Azure Data Lake Gen2. But it is true that listing all files using dbutlis.fs is fast.

So the question is: How can I use such a list to read and parse files in parallel?. The problem is that wholeTextFile does not accept a list as an argument. I have try all of the following:

list_of_all_files_paths = dbutils.someCode()

# Attempt 1: Type mismatch
rdd = sc.wholeTextFile(list_of_all_files_paths)

# Attempt 2: Works but all partitiong info is lost
rdd = spark.read.text(list_of_all_files_paths, wholetext=True)

# Attempt 3:  Takes a lot of time too
rdd = spark.read.text('path/to/')

# Attempt 3: This is the most succesfull approach... but looks sooo bad, and is not very fast neither...
rdd = sc.emptyRDD()
for path in list_of_all_files_paths:
  newRDD = sc.wholeTextFiles(path)
  rdd    = rdd.union(newRDD)

Solution

  • As @jxc have answered in the comments. The solution is very simple:

    rdd = sc.wholeTextFile(','.join(list_of_all_files_paths))
    

    Turns out that string representing a list of paths is a valid input. The times are still high due to the amount of I/O but at least the listing part has become very small