Search code examples
amazon-web-servicesapache-sparkhadoop-yarnamazon-emr

Running multiple similar spark submit jobs on yarn


I have multiple spark submit jobs, which needs to be submitted as a Step on AWS EMR cluster, data input for my steps is same for all the steps, which is currently located in S3 bucket and output is also on s3, and all steps need to be executed in parallel.

Currently AWS EMR does not support parallel execution of spark-submits, but there is a way that is through changing EMR’s YARN’s scheduler to FAIR Scheduler, but problem with this approach is that we need to manually define Cores /Memory for executors that is an overhead.

Searching for best possible way to handle this scenario in AWS EMR, any suggestions ????

Thanks, -Jack


Solution

  • Currently AWS EMR does not support parallel execution of spark-submit

    Actually it is supported but a bit tricky. There are two main issues with this, the number of executors used by each Spark job and amount of memory for driver process on the master node.

    If you know the max number of simultaneous spark-submits executions in advance (N), you can play with --driver-memory, --num-executors, --executor-cores and --executor-memory parameters to make sure each job takes only 1/N of available resources (or more if it is ok to wait till the jobs submitted earlier finish). This is very inefficient if this parallel execution is rarely happens in practice.

    A better approach is to use dynamic allocation: https://spark.apache.org/docs/latest/configuration.html#dynamic-allocation

    There are quite few parameters to play with but it will allow single job to utilize almost all available resources when no other jobs are running (if maxExecutors is the spark.dynamicAllocation.maxExecutors is the max available number of executors for a given set of parameters) and fall back to share executors evenly among the jobs if more than one are running. This rebalancing is not happening immediately though as the running tasks are not interrupted. Also, for this to work, all spark-submit calls must use dynamic allocation. If one of spark-submit calls doesn't use dynamic allocation, it is still able to grab all the resources and starve the rest of Spark jobs.

    Here is an example of dynamic allocation parameters:

    spark-submit \
        --deploy-mode client \
        --master yarn \
        --conf spark.dynamicAllocation.enabled=true \
        --conf spark.shuffle.service.enabled=true \
        --conf spark.dynamicAllocation.executorIdleTimeout=60s \
        --conf spark.dynamicAllocation.cachedExecutorIdleTimeout=60s \
        --conf spark.dynamicAllocation.minExecutors=1 \
        --conf spark.dynamicAllocation.maxExecutors=4 \
        --conf spark.dynamicAllocation.initialExecutors=1 \
        <other Spark options>
    

    For a cluster with 8 nodes:

    • 1 job - 4 executors
    • 2 jobs - 4 executors for each
    • 3 jobs - 2-3 executors for each
    • 4 jobs - 2 executes for each
    • 5-8 jobs - 1-2 executors for each
    • more than 8 jobs - some jobs are going to wait, the rest will get 1 executor each.

    FAIR scheduling also works and it allows to utilize cluster resources even better than dynamic allocation (as it doesn't wait till executorIdleTimeout or cachedExecutorIdleTimeout is expired) but it only works for multiple parallel jobs initiated by different threads in the same Spark App (in the same SparkContext) : https://spark.apache.org/docs/latest/job-scheduling.html.

    Some app architecture changes may be required to enable this. E.g. the app can read processing requests from a queue or expose a web service to receive them.