Search code examples
outputapache-sparkavrocoalescerdd

coalesce does not reduce my number of output files


I have a spark job which manages a RDD[SpecificRecordBase], on a HDFS.

My problem is that it generates a lot of files, including 95% of empty avro files. I tried to use coalesce to reduce my number of partitions on my RDD, and so the number of my output files, but it has no effect.

 def write(data: RDD[SpecificRecordBase]) = {
   data.coalesce(1, false)    //has no effect
   val conf = new Configuration()
   val job = new org.apache.hadoop.mapreduce.Job(conf)

   AvroJob.setOutputKeySchema(job, schema)
   val pair = new PairRDDFunctions(rdd)
   pair.saveAsNewAPIHadoopFile(
     outputAvroDataPath,
     classOf[AvroKey[SpecificRecordBase]],
     classOf[org.apache.hadoop.io.NullWritable],
     classOf[AvroKeyOutputFormat[SpecificRecordBase]],
     job.getConfiguration)
}

I suppose something is lost between rdd partition configuration and HDFS partition, and maybe saveAsNewAPIHadoopFile does not take it into account, but I'm not sure of it.

Have I missed something ?

Could somebody explain what really appends when calling saveAsNewAPIHadoopFile according to rdd partitionning ?


Solution

  • Answering my own question thanks to @0x0FFF, the correct code should be :

        def write(data: RDD[SpecificRecordBase]) = {
               val rdd = data.map(t => (new AvroKey(t), org.apache.hadoop.io.NullWritable.get))
               val rdd1Partition = rdd.coalesce(1, false)  //change nb of partitions to 1
    
               val conf = new Configuration()
               val job = new org.apache.hadoop.mapreduce.Job(conf)
    
               AvroJob.setOutputKeySchema(job, schema)
               val pair = new PairRDDFunctions(rdd1Partition) //so only one file will be in output
               pair.saveAsNewAPIHadoopFile(
                 outputAvroDataPath,
                 classOf[AvroKey[SpecificRecordBase]],
                 classOf[org.apache.hadoop.io.NullWritable],
                 classOf[AvroKeyOutputFormat[SpecificRecordBase]],
                 job.getConfiguration)
            }
    

    Thank you again !