Search code examples
scalaapache-sparkhdfsspark-avro

Spark on Cluster: Read Large number of small avro files is taking too long to list


I know this problem of reading large number of small files in HDFS have always been an issue and been widely discussed, but bear with me. Most of the stackoverflow problems dealing with this type of issue concerns with reading a large number of txt files.I'm trying to read a large number of small avro files

Plus these reading txt files solutions talk about using WholeTextFileInputFormat or CombineInputFormat (https://stackoverflow.com/a/43898733/11013878) which are RDD implementations, I'm using Spark 2.4 (HDFS 3.0.0) and RDD implementations are generally discouraged and dataframes are preferred. I would prefer using dataframes but am open to RDD implementations as well.

I've tried unioning dataframes as suggested by Murtaza, but on a large number of files I get OOM error (https://stackoverflow.com/a/32117661/11013878)

I'm using the following code

val filePaths = avroConsolidator.getFilesInDateRangeWithExtension //pattern:filePaths: Array[String] 
//I do need to create a list of file paths as I need to filter files based on file names. Need this logic for some upstream process
//example : Array("hdfs://server123:8020/source/Avro/weblog/2019/06/03/20190603_1530.avro","hdfs://server123:8020/source/Avro/weblog/2019/06/03/20190603_1531.avro","hdfs://server123:8020/source/Avro/weblog/2019/06/03/20190603_1532.avro")
val df_mid = sc.read.format("com.databricks.spark.avro").load(filePaths: _*)
      val df = df_mid
        .withColumn("dt", date_format(df_mid.col("timeStamp"), "yyyy-MM-dd"))
        .filter("dt != 'null'")

      df
        .repartition(partitionColumns(inputs.logSubType).map(new org.apache.spark.sql.Column(_)):_*)
        .write.partitionBy(partitionColumns(inputs.logSubType): _*)
        .mode(SaveMode.Append)
        .option("compression","snappy")
        .parquet(avroConsolidator.parquetFilePath.toString)

It took 1.6 mins to list 183 small files at the job level Jobs UI

Weirdly enough my stage UI page just shows 3s(dont understand why) Stage UI

The avro files are stored in yyyy/mm/dd partitions: hdfs://server123:8020/source/Avro/weblog/2019/06/03

Is there any way I can speed the Listing of leaf files, as you can from screenshot it takes only 6s to consilidate into parquet files, but 1.3 mins to list the files


Solution

  • Since it's taking too long to read large number of small files, I took a step back, and created RDDs using CombineFileInputFormat. This This InputFormat works well with small files, because it packs many of them into one split so there are fewer mappers, and each mapper has more data to process.

    Here's what I did:

    def createDataFrame(filePaths: Array[Path], sc: SparkSession, inputs: AvroConsolidatorInputs): DataFrame = {
    
       val job: Job = Job.getInstance(sc.sparkContext.hadoopConfiguration)
       FileInputFormat.setInputPaths(job, filePaths: _*)
       val sqlType = SchemaConverters.toSqlType(getSchema(inputs.logSubType))
    
       val rddKV = sc.sparkContext.newAPIHadoopRDD(
                       job.getConfiguration,
                       classOf[CombinedAvroKeyInputFormat[GenericRecord]],
                       classOf[AvroKey[GenericRecord]],
                       classOf[NullWritable])
    
       val rowRDD = rddKV.mapPartitions(
                      f = (iter: Iterator[(AvroKey[GenericRecord], NullWritable)]) =>
                           iter.map(_._1.datum()).map(genericRecordToRow(_, sqlType))
                           , preservesPartitioning = true)
    
       val df = sc.sqlContext.createDataFrame(rowRDD , 
                  sqlType.dataType.asInstanceOf[StructType])
       df
    

    CombinedAvroKeyInputFormat is user defined class which extends CombineFileInputFormat and puts 64MB of data in a single split.

    object CombinedAvroKeyInputFormat {
    
      class CombinedAvroKeyRecordReader[T](var inputSplit: CombineFileSplit, context: TaskAttemptContext, idx: Integer)
        extends AvroKeyRecordReader[T](AvroJob.getInputKeySchema(context.getConfiguration))
      {
        @throws[IOException]
        @throws[InterruptedException]
        override def initialize(inputSplit: InputSplit, context: TaskAttemptContext): Unit = {
          this.inputSplit = inputSplit.asInstanceOf[CombineFileSplit]
          val fileSplit = new FileSplit(this.inputSplit.getPath(idx),
                                        this.inputSplit.getOffset(idx),
                                        this.inputSplit.getLength(idx),
                                        this.inputSplit.getLocations)
          super.initialize(fileSplit, context)
        }
      }
    
    }
    
    /*
     * The class CombineFileInputFormat is an abstract class with no implementation, so we must create a subclass to support it;
     * We’ll name the subclass CombinedAvroKeyInputFormat. The subclass will initiate a delegate CombinedAvroKeyRecordReader that extends AvroKeyRecordReader
     */
    
    class CombinedAvroKeyInputFormat[T] extends CombineFileInputFormat[AvroKey[T], NullWritable] {
      val logger = Logger.getLogger(AvroConsolidator.getClass)
      setMaxSplitSize(67108864)
      def createRecordReader(split: InputSplit, context: TaskAttemptContext): RecordReader[AvroKey[T], NullWritable] = {
        val c = classOf[CombinedAvroKeyInputFormat.CombinedAvroKeyRecordReader[_]]
        val inputSplit = split.asInstanceOf[CombineFileSplit]
    
        /*
         * CombineFileRecordReader is a built in class that pass each split to our class CombinedAvroKeyRecordReader
         * When the hadoop job starts, CombineFileRecordReader reads all the file sizes in HDFS that we want it to process,
         * and decides how many splits base on the MaxSplitSize
         */
        return new CombineFileRecordReader[AvroKey[T], NullWritable](
          inputSplit,
          context,
          c.asInstanceOf[Class[_ <: RecordReader[AvroKey[T], NullWritable]]])
      }
    }
    

    This made reading of small files a lot faster