Search code examples
scalaapache-sparkdatabricks

Using Spark to process multiple files in parallel without combining them at the end


I currently have a list of .txt.gz files that are .tsv in nature (I cannot control the source file format, and they're all >10GB) that I essentially need to read in, unzip, and write out to a destination file path in parquet format for later processing.

Each file is different in nature and shouldn't be combined - so I can't pass the full path.

Right now I have a simple UDF that does some preprocessing on any filename, reads it in, and then attempts to write it back out to the directory - I pass this UDF and a list of filenames to a .map function hoping that it gets spread out across executors...a simple example would be if I had 4 files and 4 executors each with 3 cores then each executor would get it's own file, and it would unzip/read in that file (ideally onto 3 partitions for one of each of the 3 cores), and then write it back out into parquet format

// Find a file, unzip it, partition and write to parquet with no filename extension
def unzip_and_write_delta(fileName:String, outFileDir:String) : Int = {
  val inFileDir:String = if (domino_source.contains(fileName)) mounted_source_filepath1 else mounted_source_filepath2
  val inFullPath:String = inFileDir.concat(fileName)
  
  val fileOutputName = fileName.replace(".txt.gz", "") //.replace(".rar", "")
  val outFullPath:String = outFileDir.concat(fileOutputName)

  val returned = spark.read.option("sep", "\t").option("header", "true").option("numPartitions", 3).csv(inFullPath).write.parquet(outFullPath)
  // val returned = sc.textFile(inFullPath).map(_.split("\t")).write.parquet(outFullPath)
  return 0
}

This function is then fed to a map function:

val out_val = full_filelist.map(
  f => unzip_and_write_delta(fileName=f, outFileDir=mounted_target_filepath)
)

The sc.textFile command seemed to work yesterday, but the results were filled with bad encodings, and so I switched it to the spark.read.option today and now neither of them seem to be doing anything in parallel...each file is loaded sequentially onto a single partition on a single executor and the entire thing is read and then written out ultimately bypassing any efficiency gains.

Executor Panel

I've tried multiple combinations of the spark.read and sc.textFile commands, both with and without the repartition command, both setting an option of numPartitions, and with and without the list.map() function and nothing seems to be working in parallel

At a minimum I'm hoping to at least have each executor process it's own file, and as a bonus being able to read and process across cores (can't find much for csv, but seems to be options for textFile...although the encodings don't work well)


Solution

  • Getting at least things ran in parallel was solved by adding .par to the map function

    val out_val = full_filelist.par.map(
      f => unzip_and_write_delta(fileName=f, outFileDir=mounted_target_filepath)
    )
    

    which resulted in this executor panel: Executor Panel