Search code examples
apache-sparkpysparkgoogle-cloud-dataproc

Spark looses all executors one minute after starting


I run pyspark on 8 node Google dataproc cluster with default settings. Few seconds after starting I see 30 executor cores running (as expected):

    >>> sc.defaultParallelism
    30

One minute later:

    >>> sc.defaultParallelism
    2

From that point all actions run on only 2 cores:


    >>> rng = sc.parallelize(range(1,1000000))
    >>> rng.cache()
    >>> rng.count()
    >>> rng.getNumPartitions()
    2

If I run rng.cache() while cores are still connected they stay connected and jobs get distributed.

Checking on monitoring app (port 4040 on master node) shows executors are removed:

Executor 1
Removed at 2016/02/25 16:20:14
Reason: Container container_1456414665542_0006_01_000002 exited from explicit termination request." 

Is there some setting that could keep cores connected without workarounds?


Solution

  • For the most part, what you are seeing is actually just the differences in how Spark on YARN can be configured vs spark standalone. At the moment, YARN's reporting of "VCores Used" doesn't actually correctly correspond to a real container reservation of cores, and containers are actually just based on the memory reservation.

    Overall there are a few things at play here:

    Dynamic allocation causes Spark to relinquish idle executors back to YARN, and unfortunately at the moment spark prints that spammy but harmless "lost executor" message. This was the classical problem of spark on YARN where spark originally paralyzed clusters it ran on because it would grab the maximum number of containers it thought it needed and then never give them up.

    With dynamic allocation, when you start a long job, spark quickly allocates new containers (with something like exponential ramp-up to quickly be able to fill a full YARN cluster within a couple minutes), and when idle, relinquishes executors with the same ramp-down at an interval of about 60 seconds (if idle for 60 seconds, relinquish some executors).

    If you want to disable dynamic allocation you can run:

    spark-shell --conf spark.dynamicAllocation.enabled=false
    
    gcloud dataproc jobs submit spark --properties spark.dynamicAllocation.enabled=false --cluster <your-cluster> foo.jar
    

    Alternatively, if you specify a fixed number of executors, it should also automatically disable dynamic allocation:

    spark-shell --conf spark.executor.instances=123
    
    gcloud dataproc jobs submit spark --properties spark.executor.instances=123 --cluster <your-cluster> foo.jar