Me using spark-sql-2.3.1v , kafka with java8 in my project. With
--driver-memory 4g \
--driver-cores 2 \
--num-executors 120 \
--executor-cores 1 \
--executor-memory 768m \
At consumer side , me trying to write the files in hdfs Me using something like this below code
dataSet.writeStream()
.format("parquet")
.option("path", parqetFileName)
.option("mergeSchema", true)
.outputMode("Append")
.partitionBy("company_id","date")
.option("checkpointLocation", checkPtLocation)
.trigger(Trigger.ProcessingTime("25 seconds"))
.start();
When I store into hdfs folder it looks something below i.e. each file is around 1.5k+ i.e. few KBs.
$ hdfs dfs -du -h /transactions/company_id=24779/date=2014-06-24/
1.5 K /transactions/company_id=24779/date=2014-06-24/part-00026-1027fff9-5745-4250-961a-fd56508b7ea3.c000.snappy.parquet
1.5 K /transactions/company_id=24779/date=2014-06-24/part-00057-6604f6cc-5b8d-41f4-8fc0-14f6e13b4a37.c000.snappy.parquet
1.5 K /transactions/company_id=24779/date=2014-06-24/part-00098-754e6929-9c75-430f-b6bb-3457a216aae3.c000.snappy.parquet
1.5 K /transactions/company_id=24779/date=2014-06-24/part-00099-1d62cbd5-7409-4259-b4f3-d0f0e5a93da3.c000.snappy.parquet
1.5 K /transactions/company_id=24779/date=2014-06-24/part-00109-1965b052-c7a6-47a8-ae15-dea301010cf5.c000.snappy.parquet
Due to this small files , its taking a lot of processing time , while I read back larger set of data from hdfs & count the number of rows then it is resulting into below heap space error.
2020-02-12 07:07:57,475 [Driver] ERROR org.apache.spark.deploy.yarn.ApplicationMaster - User class threw exception: java.lang.OutOfMemoryError: Java heap space
java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOfRange(Arrays.java:3664)
at java.lang.String.<init>(String.java:207)
at java.lang.String.substring(String.java:1969)
at java.net.URI$Parser.substring(URI.java:2869)
at java.net.URI$Parser.parse(URI.java:3049)
at java.net.URI.<init>(URI.java:588)
at org.apache.spark.sql.execution.streaming.SinkFileStatus.toFileStatus(FileStreamSinkLog.scala:52)
at org.apache.spark.sql.execution.streaming.MetadataLogFileIndex$$anonfun$2.apply(MetadataLogFileIndex.scala:46)
at org.apache.spark.sql.execution.streaming.MetadataLogFileIndex$$anonfun$2.apply(MetadataLogFileIndex.scala:46)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at org.apache.spark.sql.execution.streaming.MetadataLogFileIndex.<init>(MetadataLogFileIndex.scala:46)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:336)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167)
at com.spgmi.ca.prescore.utils.DbUtils.loadFromHdfs(DbUtils.java:129)
at com.spgmi.ca.prescore.spark.CountRecords.main(CountRecords.java:84)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:684)
2020-02-12 07:07:57,533 [Reporter] WARN org.apache.spark.deploy.yarn.ApplicationMaster - Reporter thread fails 1 time(s) in a row.
java.io.IOException: Failed on local exception: java.nio.channels.ClosedByInterruptException; Host Details : local host is: dev1-dev.com":8030;
at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:805)
at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1497)
at org.apache.hadoop.ipc.Client.call(Client.java:1439)
at org.apache.hadoop.ipc.Client.call(Client.java:1349)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
at com.sun.proxy.$Proxy22.allocate(Unknown Source)
at org.apache.hadoop.yarn.api.impl.pb.client.ApplicationMasterProtocolPBClientImpl.allocate(ApplicationMasterProtocolPBClientImpl.java:77)
at sun.reflect.GeneratedMethodAccessor32.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422)
at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)
at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)
at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359)
at com.sun.proxy.$Proxy23.allocate(Unknown Source)
at org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.allocate(AMRMClientImpl.java:296)
at org.apache.spark.deploy.yarn.YarnAllocator.allocateResources(YarnAllocator.scala:249)
at org.apache.spark.deploy.yarn.ApplicationMaster.org$apache$spark$deploy$yarn$ApplicationMaster$$allocationThreadImpl(ApplicationMaster.scala:540)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$1.run(ApplicationMaster.scala:606)
Caused by: java.nio.channels.ClosedByInterruptException
at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:753)
at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206)
at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:531)
at org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:687)
at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:790)
at org.apache.hadoop.ipc.Client$Connection.access$3500(Client.java:411)
at org.apache.hadoop.ipc.Client.getConnection(Client.java:1554)
Questions:
are these small files going to result in "small file problem" in spark processing? If so how to deal with this scenario.
If I want to count the total number of records from given hdfs folder, how to do it ?
- How to know how much heap-space necessary to handle this kind of data ?
After new changes
--driver-memory 16g \
--driver-cores 1 \
--num-executors 120 \
--executor-cores 1 \
--executor-memory 768m \
Run successfully results are :
2020-02-12 20:28:56,188 [Driver] WARN com.spark.mypackage.CountRecords - NUMBER OF PARTITIONS AFTER HDFS READ : 77926
+--------------------+-----+
|SPARK_PARTITION_ID()|count|
+--------------------+-----+
| 24354| 94|
| 26425| 96|
| 32414| 64|
| 76143| 32|
| 16861| 32|
| 30903| 64|
| 40335| 32|
| 64121| 64|
| 69042| 32|
| 32539| 64|
| 34759| 32|
| 41575| 32|
| 1591| 64|
| 3050| 98|
| 51772| 32|
+--------------------+-----+
2020-02-12 20:50:32,301 [Driver] WARN com.spark.mypackage.CountRecords - RECORD COUNT: 3999708
Yes. Small files is not only a Spark problem. It causes unnecessary load on your NameNode. You should spend more time compacting and uploading larger files than worrying about OOM when processing small files. The fact that your files are less than 64MB / 128MB, then that's a sign you're using Hadoop poorly.
Something like spark.read("hdfs://path").count()
would read all the files in the path, then count the rows in the Dataframe
There is no hard-set number. You need to enable JMX monitoring on your jobs and see what the heap size is reaching. Otherwise, arbitrarily double the current memory you're giving the job until it starts not getting OOM. If you start approaching more than 8GB, then you need to consider reading less data in each job by adding more parallelization.
FWIW, Kafka Connect can also be used to output partitioned HDFS/S3 paths.