Search code examples
scalaapache-sparkapache-spark-sqlhadoop2

How to load huge no of small files in spark on EMR


I am loading 50 GB of text files initially partitioned in 190 text files. I ran my spark job on that and it worked great. It took 12 minutes to complete the job. The output of this job is again 50 GB and with default partition spark has created huge number of small files.

Now I want to run my spark job on the output files again. It's running horribly slow. After two hours I had to stop my cluster.

I debugged and found spark was busy loading files itself and that is how I identified the issue is with the huge nubmer of small files.

This is annoying as spark wants to load big files but again does not want output big file.

How to handle this situation?

I tried this

val rdd =sc.textFile(mainFileURL, 10).repartition(10)

But I encountered the problem where I was getting info from the files name; and I get this error:

Caused by: java.lang.ArrayIndexOutOfBoundsException: 3

val get_cus_val = sqlContext.udf.register("get_cus_val", (filePath: String) => filePath.split("\\.")(3))

Will changing WholeTextFiles work?

val rdd = sc.wholeTextFiles(mainFileURL)

When I do that I get error in below line saying

value contains is not a member of (String, String)

val header = rdd.filter(_.contains("FundamentalSeriesId")).map(line => line.split("\\|\\^\\|")).first()

Can some one suggest how to handle this small file problem?

At last I have partitionBy columns also where i direct spark to put certain records in specific partition .But some of the partition has huge size approx. 50 GB.If I further partition the no of files will increase .

dfMainOutputFinalWithoutNull.write.partitionBy("DataPartition", "PartitionYear")

Solution

  • So, I'm not really sure which version of Spark you are running, but you use sqlContext and sc.wholeTextFiles, so I'm guessing you are running some pre-2.x version. In general Spark does not handle many small files well, and as has been suggested in comments, I too highly recommend that you reduce the number of output files first. In order to do this without it taking forever, you need to partition you dataframe before you call .write.partitionBy, so please try to modify you code like this:

    dfMainOutputFinalWithoutNull
      .repartition("DataPartition", "PartitionYear")
      .write
      .partitionBy("DataPartition", "PartitionYear")
      ...
    

    That should speed up the job significantly :)