Search code examples
pythonamazon-web-servicespysparkamazon-emr

An import issue gets pySpark on AWS stuck in Running status


In short: I run a pySpark application on AWS's EMR. When I map an rdd using a custom function that resides in an external module in an external package (shipped inside a .zip file as --py-files) the cluster gets stuck - the Running status is kept while no more log lines appear until I manually terminate it.

What it is not: It is not a proper import exception - as this would have terminated the application upon executing the import lines, raising the appropriate exception, which does not happen. Also, as seen below, calling a function that maps with a similar function as a lambda, when the called function resides in the "problematic" module - works.

What it is: Only when the program tries to use a function from that module as a mapping function in a transformation that is written in the main program does the bug occur. Additionally, if I remove the import line highlighted in the external file (the "problematic" module) - an import that is never used there in this minimal bug-reproduction context (but in the actual context it is used) - the bug ceases to exist.

Below is the code for the minimal example of the bug, including commenting of 2 important lines, and some technical info. Any help would be appreciated.

Here is the main program:

import spark_context_holder
from reproducing_bugs_external_package import reproducing_bugs_external_file


sc = spark_context_holder.sc
log = spark_context_holder.log


def make_nums_rdd():
    return sc.parallelize([1, 2, 3] * 300).map(lambda x: x * x / 1.45)

log.warn("Starting my code!")
sum = sc.parallelize([1,2,3]*300).map(lambda x: x*x/1.45).sum()
log.warn("The calculated sum using in-line expression, which doesn't mean anything more than 'succeeded in carrying out the calculation on the cluster', is {}!".format(sum))
simple_sum_rdd = make_nums_rdd()
log.warn("The calculated sum using the in-file function, which doesn't mean anything more than 'succeeded in carrying out the calculation on the cluster', is {}!".format(simple_sum_rdd.sum()))
simple_sum_rdd = reproducing_bugs_external_file.make_nums_rdd(sc)
log.warn("The calculated sum using the external file's function, which doesn't mean anything more than 'succeeded in carrying out the calculation on the cluster', is {}!".format(simple_sum_rdd.sum()))
simple_sum_rdd = sc.parallelize([1,2,3]*300).map(reproducing_bugs_external_file.calc_func)
log.warn("The calculated sum using the external file's mapping function, which doesn't mean anything more than 'succeeded in carrying out the calculation on the cluster', is {}!".format(simple_sum_rdd.sum()))
# This last line does not get logged, while the others up until this one do. Here the cluster gets stuck on Running status without outputting any more log lines

In the zip file shipped as --py-files I have the following structure:

> spark_context_holde.py
> reproducing_bugs_external_package
  >> __init__.py
  >> reproducing_bugs_external_file.py

And here are their respective contents:

spark_context_holder.py

from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext

conf = SparkConf().setAppName("kac_walk_experiment")
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
log4jLogger = sc._jvm.org.apache.log4j
log = log4jLogger.LogManager.getLogger("dbg_et")

# sc.setLogLevel("ALL")

def getParallelismAlternative():
    return int(sc.getConf().get('spark.cores.max'))

__init__.py

from . import reproducing_bugs_external_file

__all__ = [reproducing_bugs_external_file]

reproducing_bugs_external_file.py

import numpy
import spark_context_holder  # If this is removed - the bug stops!


def make_nums_rdd(sc):
    return sc.parallelize([1, 2, 3] * 300).map(lambda x: x * x / 1.45)


def calc_func(x):
    return x*x/1.45

More technical details:

  • Release label:emr-5.17.0
  • Hadoop distribution:Amazon 2.8.4
  • Applications:Spark 2.3.1
  • using python3.4 which is the 3 version installed on AWS's machines to date

Solution

  • I think your fundamental problem is that you're taking a bunch of Pyspark setup code that's meant to be run only on the master node and running it on the slave nodes instead. There's no reason why these lines:

    from pyspark.sql import SparkSession
    from pyspark import SparkConf, SparkContext
    
    conf = SparkConf().setAppName("kac_walk_experiment")
    sc = SparkContext(conf=conf)
    spark = SparkSession(sc)
    log4jLogger = sc._jvm.org.apache.log4j
    log = log4jLogger.LogManager.getLogger("dbg_et")
    

    should be in an external module in the first place, and they definitely shouldn't be in a module that you're zipping and exporting to the slave nodes via --py-files. That will certainly cause a lot of undefined behavior, probably up to and including the hanging bug that you're getting.

    Move the above lines to your main program and everything should be fine. You'll also have to rewrite any remaining code in spark_context_holder.py to match. For example, getParallelismAlternative will have to take sc as an argument:

    def getParallelismAlternative(sc):
        return int(sc.getConf().get('spark.cores.max'))