Search code examples
apache-sparkamazon-s3apache-spark-sqlhdfsparquet

How to avoid small file problem while writing to hdfs & s3 from spark-sql-streaming


Me using spark-sql-2.3.1v , kafka with java8 in my project. With

--driver-memory 4g \
--driver-cores 2 \
--num-executors 120 \
--executor-cores 1 \
--executor-memory 768m \

At consumer side , me trying to write the files in hdfs Me using something like this below code

                          dataSet.writeStream()
                                        .format("parquet")
                                        .option("path", parqetFileName)
                                        .option("mergeSchema", true)
                                        .outputMode("Append")
                                        .partitionBy("company_id","date")
                                        .option("checkpointLocation", checkPtLocation)
                                        .trigger(Trigger.ProcessingTime("25 seconds"))
                                        .start();

When I store into hdfs folder it looks something below i.e. each file is around 1.5k+ i.e. few KBs.

$ hdfs dfs -du -h /transactions/company_id=24779/date=2014-06-24/
1.5 K  /transactions/company_id=24779/date=2014-06-24/part-00026-1027fff9-5745-4250-961a-fd56508b7ea3.c000.snappy.parquet
1.5 K  /transactions/company_id=24779/date=2014-06-24/part-00057-6604f6cc-5b8d-41f4-8fc0-14f6e13b4a37.c000.snappy.parquet
1.5 K  /transactions/company_id=24779/date=2014-06-24/part-00098-754e6929-9c75-430f-b6bb-3457a216aae3.c000.snappy.parquet
1.5 K  /transactions/company_id=24779/date=2014-06-24/part-00099-1d62cbd5-7409-4259-b4f3-d0f0e5a93da3.c000.snappy.parquet
1.5 K  /transactions/company_id=24779/date=2014-06-24/part-00109-1965b052-c7a6-47a8-ae15-dea301010cf5.c000.snappy.parquet

Due to this small files , its taking a lot of processing time , while I read back larger set of data from hdfs & count the number of rows then it is resulting into below heap space error.

2020-02-12 07:07:57,475 [Driver] ERROR org.apache.spark.deploy.yarn.ApplicationMaster - User class threw exception: java.lang.OutOfMemoryError: Java heap space
java.lang.OutOfMemoryError: Java heap space
        at java.util.Arrays.copyOfRange(Arrays.java:3664)
        at java.lang.String.<init>(String.java:207)
        at java.lang.String.substring(String.java:1969)
        at java.net.URI$Parser.substring(URI.java:2869)
        at java.net.URI$Parser.parse(URI.java:3049)
        at java.net.URI.<init>(URI.java:588)
        at org.apache.spark.sql.execution.streaming.SinkFileStatus.toFileStatus(FileStreamSinkLog.scala:52)
        at org.apache.spark.sql.execution.streaming.MetadataLogFileIndex$$anonfun$2.apply(MetadataLogFileIndex.scala:46)
        at org.apache.spark.sql.execution.streaming.MetadataLogFileIndex$$anonfun$2.apply(MetadataLogFileIndex.scala:46)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
        at org.apache.spark.sql.execution.streaming.MetadataLogFileIndex.<init>(MetadataLogFileIndex.scala:46)
        at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:336)
        at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167)
        at com.spgmi.ca.prescore.utils.DbUtils.loadFromHdfs(DbUtils.java:129)
        at com.spgmi.ca.prescore.spark.CountRecords.main(CountRecords.java:84)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:684)
2020-02-12 07:07:57,533 [Reporter] WARN  org.apache.spark.deploy.yarn.ApplicationMaster - Reporter thread fails 1 time(s) in a row.
java.io.IOException: Failed on local exception: java.nio.channels.ClosedByInterruptException; Host Details : local host is: dev1-dev.com":8030;
        at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:805)
        at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1497)
        at org.apache.hadoop.ipc.Client.call(Client.java:1439)
        at org.apache.hadoop.ipc.Client.call(Client.java:1349)
        at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227)
        at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
        at com.sun.proxy.$Proxy22.allocate(Unknown Source)
        at org.apache.hadoop.yarn.api.impl.pb.client.ApplicationMasterProtocolPBClientImpl.allocate(ApplicationMasterProtocolPBClientImpl.java:77)
        at sun.reflect.GeneratedMethodAccessor32.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422)
        at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)
        at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)
        at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359)
        at com.sun.proxy.$Proxy23.allocate(Unknown Source)
        at org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.allocate(AMRMClientImpl.java:296)
        at org.apache.spark.deploy.yarn.YarnAllocator.allocateResources(YarnAllocator.scala:249)
        at org.apache.spark.deploy.yarn.ApplicationMaster.org$apache$spark$deploy$yarn$ApplicationMaster$$allocationThreadImpl(ApplicationMaster.scala:540)
        at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$1.run(ApplicationMaster.scala:606)
Caused by: java.nio.channels.ClosedByInterruptException
        at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
        at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:753)
        at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206)
        at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:531)
        at org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:687)
        at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:790)
        at org.apache.hadoop.ipc.Client$Connection.access$3500(Client.java:411)
        at org.apache.hadoop.ipc.Client.getConnection(Client.java:1554)

Questions:

  1. are these small files going to result in "small file problem" in spark processing? If so how to deal with this scenario.

  2. If I want to count the total number of records from given hdfs folder, how to do it ?

  3. How to know how much heap-space necessary to handle this kind of data ?

After new changes

--driver-memory 16g \
--driver-cores 1 \
--num-executors 120 \
--executor-cores 1 \
--executor-memory 768m \

Run successfully results are :

2020-02-12 20:28:56,188 [Driver] WARN  com.spark.mypackage.CountRecords -  NUMBER OF PARTITIONS AFTER HDFS READ : 77926
+--------------------+-----+
|SPARK_PARTITION_ID()|count|
+--------------------+-----+
|               24354|   94|
|               26425|   96|
|               32414|   64|
|               76143|   32|
|               16861|   32|
|               30903|   64|
|               40335|   32|
|               64121|   64|
|               69042|   32|
|               32539|   64|
|               34759|   32|
|               41575|   32|
|                1591|   64|
|                3050|   98|
|               51772|   32|
+--------------------+-----+

2020-02-12 20:50:32,301 [Driver] WARN  com.spark.mypackage.CountRecords -  RECORD COUNT: 3999708

Solution

    1. Yes. Small files is not only a Spark problem. It causes unnecessary load on your NameNode. You should spend more time compacting and uploading larger files than worrying about OOM when processing small files. The fact that your files are less than 64MB / 128MB, then that's a sign you're using Hadoop poorly.

    2. Something like spark.read("hdfs://path").count() would read all the files in the path, then count the rows in the Dataframe

    3. There is no hard-set number. You need to enable JMX monitoring on your jobs and see what the heap size is reaching. Otherwise, arbitrarily double the current memory you're giving the job until it starts not getting OOM. If you start approaching more than 8GB, then you need to consider reading less data in each job by adding more parallelization.


    FWIW, Kafka Connect can also be used to output partitioned HDFS/S3 paths.