Search code examples
javaapache-sparkrddemramazon-emr

Java Spark collect() javaRdd fails with Memory errors (EMR cluster)


Im running Spark App on AWS EMR(Elastic Map Reduce) cluster My master node characteristics are: 8 vCore, 15 GiB memory, 80 SSD GB storage My executors node are: 8 vCore, 15 GiB memory, 80 SSD GB storage

I have csv inputFile file with size - 600MB.I am trying to read it to JavaRdd and then use collect() to convert it to List of objects.

Here is my code:

JavaRDD<WebLabPurchasesDataObject> allRecords = context.textFile(inputFile).map (
        data -> {
            String[] fields = data.split(",", -1);

            String hitDay = fields[0];
            String treatmentName = fields[1];
            String sessionId = fields[2];

            return new WebLabPurchasesDataObject(hitDay,treatmentName,sessionId);   
        });

allRecords.cache();

List<WebLabPurchasesDataObject> webLabRddAllRecordsList = allRecords.collect();

Everytime I try to run this code, I get java.lang.OutOfMemoryError: Java heap space. As far as I understand Spark is performing collect() operation on my master node. So is there any way to increase the memory, so it would be able to run the program?

18/03/15 16:35:48 WARN scheduler.TaskSetManager: Lost task 1.0 in stage 2.0 (TID 5, ip-1443-405-18-1544.us-west-2.compute.internal): java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:3236)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)    18/03/15 16:35:49 ERROR cluster.YarnScheduler: Lost executor 1 on ip-43ew55-154.us-west-2.compute.internal: remote Akka client disassociated

Solution

  • As you have identified, since the results are collected on the driver, you need to increase the driver memory. the default value is 1GB which is turning out to be insufficient for your case.

    Add this config when you are creating SparkSession/SparkContext: spark.driver.memory with a greater value: 2g or 3g. If you are using spark-shell then pass this as additional option while starting spark-shell: --driver-memory 3g for 3GB memory.

    I also suggest you to read more about configurations described here: https://spark.apache.org/docs/latest/configuration.html