Search code examples
apache-sparkpyspark

PySpark spark.executor.pyspark.memory introduced errors


I have a pyspark cluster facing OOM errors. To help prevent these errors, I've tried to set spark.executor.pyspark.memory to prevent eating into the memory of the JVM (which wouldn't be an issue if Spark allowed for -Xms but that's another issue).
Run environment: AWS sagemaker processing, docker (with all code/pip built in), pyspark, yarn, python using global pip installs

On setting spark.executor.pyspark.memory, I get below error. Removing the setting and it disappears.

Is there any other spark configs, python or pip settings, or spark --py-files/source library copying that should be done?

Error

23:02:10.654 [task-result-getter-0] WARN  org.apache.spark.scheduler.TaskSetManager - Lost task 3.0 in stage 17.0 (TID 237) (algo-19 executor 15): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/lib/python3.9/dist-packages/pandas/__init__.py", line 22, in <module>
    from pandas.compat import is_numpy_dev as _is_numpy_dev
  File "/usr/local/lib/python3.9/dist-packages/pandas/compat/__init__.py", line 15, in <module>
    from pandas.compat.numpy import (
  File "/usr/local/lib/python3.9/dist-packages/pandas/compat/numpy/__init__.py", line 4, in <module>
    from pandas.util.version import Version
  File "/usr/local/lib/python3.9/dist-packages/pandas/util/__init__.py", line 1, in <module>
    from pandas.util._decorators import (  # noqa:F401
  File "/usr/local/lib/python3.9/dist-packages/pandas/util/_decorators.py", line 14, in <module>
    from pandas._libs.properties import cache_readonly  # noqa:F401
  File "/usr/local/lib/python3.9/dist-packages/pandas/_libs/__init__.py", line 13, in <module>
    from pandas._libs.interval import Interval
ImportError: /usr/local/lib/python3.9/dist-packages/pandas/_libs/interval.cpython-39-x86_64-linux-gnu.so: failed to map segment from shared object
#011at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
#011at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:118)
#011at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
#011at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
#011at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
#011at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
#011at org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer$$anon$1.hasNext(InMemoryRelation.scala:119)
#011at org.apache.spark.sql.execution.columnar.CachedRDDBuilder$$anon$2.hasNext(InMemoryRelation.scala:286)
#011at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:223)
#011at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:302)
#011at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1601)
#011at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1528)
#011at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1592)
#011at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1389)
#011at org.apache.spark.storage.BlockManager.getOrElseUpdateRDDBlock(BlockManager.scala:1343)
#011at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:376)
#011at org.apache.spark.rdd.RDD.iterator(RDD.scala:326)
#011at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
#011at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
#011at org.apache.spark.scheduler.Task.run(Task.scala:141)
#011at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
#011at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
#011at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
#011at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
#011at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
#011at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
#011at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
#011at java.lang.Thread.run(Thread.java:750)

spark-defaults.conf

spark.driver.memory=191284m
spark.yarn.am.cores=48
spark.executor.memory=103293m
spark.executor.cores=47
spark.executor.instances=19
spark.driver.maxResultSize=19128m
spark.executor.memoryOverhead=34431m
spark.executor.pyspark.memory=34431m
spark.sql.adaptive.enabled=true
spark.sql.adaptive.skewJoin.enabled=true
spark.default.parallelism=38
spark.task.cpus=23
spark.sql.execution.arrow.pyspark.enabled=true
spark.sql.execution.arrow.pyspark.fallback.enabled=true
spark.network.timeout=600s
spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.kryoserializer.buffer.max=1g
spark.driver.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps
spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps
spark.eventLog.enabled=true
spark.eventLog.dir=/opt/ml/processing/output
Executor memory: 172155

Solution

  • spark.executor.pyspark.memory seems buggy for my installation (or possibly with a yarn cluster). It acted as though I was passing far too low of a memory amount (with whatever I set it to).

    To get around this we can explicitly set PYSPARK_EXECUTOR_MEMORY_MB environment variable on the executor -> https://github.com/apache/spark/blob/f75c7a7b52402e4c8faa39b2f88623e9f0bca916/python/pyspark/worker.py#L1740
    spark-defaults.conf example: spark.executorEnv.PYSPARK_EXECUTOR_MEMORY_MB=2048
    Note: this is MB of memory per task