Search code examples
apache-sparkpysparkspark-streaming

Pyspark java.lang.OutOfMemoryError error with wholeTextFiles


I have 1160 XML files, each of size 300MB in the data_dir. I want to count the number of whole words. I have a local machine with 256 cores and 256 GB RAM. Given that the whole data size is around 300GB, I know I likely cannot process all the files at once. I am wondering how can I do batch processing or partitioning to avoid the java. lang.OutOfMemoryError: Java heap space error? This is my code:

from pyspark import SparkConf, SparkContext

if __name__ == "__main__":
    data_dir = '/shared/hm31/xml_data/'

   

    conf = SparkConf().setMaster("local[*]").setAppName("WordCount")
    conf.set("spark.driver.memory", "200g")
    conf.set("spark.executor.memory", "6g")  # Example: Increase executor memory

    sc = SparkContext(conf = conf)

    rdd = sc.wholeTextFiles(data_dir,500)
    words = rdd.flatMap(lambda x: x.split())
    print("words",words.count())

Update: Even after I use 200GB RAM and 3 CPUs, I still get out of memory error.

This is my code:

data_dir = '/shared/hm31/xml_data/'




conf = SparkConf().setMaster("local[3]").setAppName("WordCount")
conf.set("spark.driver.memory", "200g")


sc = SparkContext(conf = conf)

rdd = sc.wholeTextFiles(data_dir,512)
words = rdd.flatMap(lambda x: x.split())
print("Words",words.count())

And this is the error I get:

23/10/03 13:24:28 ERROR Utils: Uncaught exception in thread stdout writer for python3
java.lang.OutOfMemoryError: Java heap space
        at java.base/java.util.Arrays.copyOf(Arrays.java:3745)
        at java.base/java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:120)
        at java.base/java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:95)
        at java.base/java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:156)
        at org.sparkproject.guava.io.ByteStreams.copy(ByteStreams.java:211)
        at org.sparkproject.guava.io.ByteStreams.toByteArray(ByteStreams.java:252)
        at org.apache.spark.input.WholeTextFileRecordReader.nextKeyValue(WholeTextFileRecordReader.scala:79)
        at org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader.nextKeyValue(CombineFileRecordReader.java:65)
        at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:247)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
        at scala.collection.Iterator.foreach(Iterator.scala:943)
        at scala.collection.Iterator.foreach$(Iterator.scala:943)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
        at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:307)
        at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:729)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:435)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread$$Lambda$1407/0x000000084089c040.apply(Unknown Source)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2048)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:270)
Exception in thread "stdout writer for python3" java.lang.OutOfMemoryError: Java heap space
        at java.base/java.util.Arrays.copyOf(Arrays.java:3745)
        at java.base/java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:120)
        at java.base/java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:95)
        at java.base/java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:156)
        at org.sparkproject.guava.io.ByteStreams.copy(ByteStreams.java:211)
        at org.sparkproject.guava.io.ByteStreams.toByteArray(ByteStreams.java:252)
        at org.apache.spark.input.WholeTextFileRecordReader.nextKeyValue(WholeTextFileRecordReader.scala:79)
        at org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader.nextKeyValue(CombineFileRecordReader.java:65)
        at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:247)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
        at scala.collection.Iterator.foreach(Iterator.scala:943)
        at scala.collection.Iterator.foreach$(Iterator.scala:943)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
        at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:307)
        at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:729)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:435)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread$$Lambda$1407/0x000000084089c040.apply(Unknown Source)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2048)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:270)
23/10/03 13:24:28 ERROR Utils: Uncaught exception in thread stdout writer for python3
java.lang.OutOfMemoryError: Java heap space
        at java.base/java.util.Arrays.copyOf(Arrays.java:3745)
        at java.base/java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:120)
        at java.base/java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:95)
        at java.base/java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:156)
        at org.sparkproject.guava.io.ByteStreams.copy(ByteStreams.java:211)
        at org.sparkproject.guava.io.ByteStreams.toByteArray(ByteStreams.java:252)
        at org.apache.spark.input.WholeTextFileRecordReader.nextKeyValue(WholeTextFileRecordReader.scala:79)
        at org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader.nextKeyValue(CombineFileRecordReader.java:65)
        at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:247)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
        at scala.collection.Iterator.foreach(Iterator.scala:943)
        at scala.collection.Iterator.foreach$(Iterator.scala:943)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
        at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:307)
        at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:729)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:435)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread$$Lambda$1407/0x000000084089c040.apply(Unknown Source)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2048)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:270)
Exception in thread "stdout writer for python3" java.lang.OutOfMemoryError: Java heap space
        at java.base/java.util.Arrays.copyOf(Arrays.java:3745)
        at java.base/java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:120)
        at java.base/java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:95)
        at java.base/java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:156)
        at org.sparkproject.guava.io.ByteStreams.copy(ByteStreams.java:211)
        at org.sparkproject.guava.io.ByteStreams.toByteArray(ByteStreams.java:252)
        at org.apache.spark.input.WholeTextFileRecordReader.nextKeyValue(WholeTextFileRecordReader.scala:79)
        at org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader.nextKeyValue(CombineFileRecordReader.java:65)
        at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:247)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
        at scala.collection.Iterator.foreach(Iterator.scala:943)
        at scala.collection.Iterator.foreach$(Iterator.scala:943)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
        at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:307)
        at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:729)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:435)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread$$Lambda$1407/0x000000084089c040.apply(Unknown Source)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2048)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:270)
23/10/03 13:24:29 ERROR Utils: Uncaught exception in thread stdout writer for python3
java.lang.OutOfMemoryError: Java heap space
Exception in thread "stdout writer for python3" java.lang.OutOfMemoryError: Java heap space

Solution

  • If you want to keep it simple, you can do the following steps in order:

    1. You're trying to set the spark.driver.memory configuration after your application has already started up. This is not possible. You should set this parameter in the command start starts your application. For example: pyspark --driver-memory 200g ... or spark-submit --driver-memory 200g .... Without this you're just running with the default 1g for memory.

    2. You're using .setMaster("local[*]"), which means you're running spark in local mode. In this case, spark.executor.memory is ignored. So:

      • remove spark.executor.memory
      • make spark.driver.memory as large as possible (something like 240g, dependent on other processes running on your machine)
    3. If you're still getting your out of memory errors after the previous steps, it means that for your workload, you don't have enough RAM/cpu. This is wholly dependent of your data (and the biggest files in there). In that case, you can lower the amount of number of CPU that are running, and have more RAM/cpu. For example, if you want to limit your calculation to use 128 CPU (instead of [*] which means all CPU available), you can do the following:

    .setMaster("local[128]")