Search code examples
scalaapache-sparkrdd

Spark 2.3.1 => 2.4 increases runtime 6-fold


I'm being forced onto a newer EMR version (5.23.1, 5.27.1, or 5.32+) by our cloud team, which is forcing me up from 5.17.0 w/ Spark 2.3.1 to Spark 2.4.x. The impetus is to allow a security configuration that forbids Instance Metadata Service Version 1 (although I've tested it without any security config attached, and also tested 5.20.1 which has no option for a security config and also runs spark 2.4.0).

The runtime on a simple ETL job increases 6x on Spark 2.4 (compared to 2.3.1) with no code changes except that spark version. There's leftOuterJoin's on big RDDs in 3 of the 4 stages that have the biggest slowdowns.

I get no errors, just a 6x increase in time/cost. All code is compiled w/ Java 8.

EDIT

Confusingly, this is one snippet of offending code where I can reproduce the problem in spark-shell, but it does very little in the test run (because the if criteria evaluates to false). No joins, no pulling data off disk... it just takes an existing RDD that's been materialized already, calls it something new and persists to disk. I persist other RDDs to disk with no problem. In EMR 5.17 this snippet takes 4.6 minutes, and in 5.23.1 it takes 20 minutes.

    val rddWSiteB: RDD[StagedFormat] = {
      if (false) {          // <-- evaluates some stuff that's false
        val site5gnsaLookup = new Site5gnsaLookup(spark, req)
        site5gnsaLookup.rddWithSiteMeta(rddWSite)
      } 
      else {
        rddWSite // <-- this is all that's happening; literally nothing
      }
    }

    rddWSiteB.setName("GetExternalMeta.rddWSiteB")

    // THIS is the problem
    // WHY does serializing/persisting to disk take 10x as long 
    //   in 2.4 vs 2.3?
    rddWSiteB.persist(StorageLevel.DISK_ONLY)
    rddWSiteB.count

END EDIT

I've read the Cloudera 2.3 => 2.4 migration guide and nothing seems relevant. Everything else I can find from databricks and blogs, it seems like most of the changes affect SQL and dataframes, but I use JSON and CSV text straight into RDDs.

I'm at a loss. With no errors, I don't really know how to fix this, but I can't imagine there's any logical reason for a 6x increase in runtime. I'm not really sure what to do next or what's going on. Any ideas to troubleshoot?

Lastly, I don't think my config is the problem, but in the interest of throwing a bunch of stuff out here in the absence of anything directly useful, I use the following config.

    [
      {
        "Classification": "spark-defaults",
        "Properties": {
          "spark.dynamicAllocation.enabled": "false",
          "spark.executor.instances": "284",
          "spark.executor.memory": "35977M",
          "spark.executor.memoryOverhead": "4497M",
          "spark.executor.cores": "5",
          "spark.driver.memory": "51199M",
          "spark.driver.memoryOverhead": "5119M",
          "spark.driver.cores": "15",
          "spark.default.parallelism": "4245",
          "spark.shuffle.compress": "true",
          "spark.serializer": "org.apache.spark.serializer.KryoSerializer",
          "spark.driver.maxResultSize": "0",
          "spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version": "2",
          "spark.network.timeout": "600s",
          "spark.rpc.message.maxSize": "512",
          "spark.scheduler.listenerbus.eventqueue.capacity": "100000",
          "spark.kryoserializer.buffer.max": "256m"
        }
      },
      {
        "Classification": "core-site",
        "Properties": {
          "fs.s3a.endpoint": "s3.amazonaws.com"
        }
      },
      {
        "Classification": "emrfs-site",
        "Properties": {
          "fs.s3.enableServerSideEncryption": "true",
          "fs.s3.maxRetries": "20"
        }
      },
      {
        "Classification": "spark-env",
        "Properties": {},
        "Configurations": [
          {
            "Classification": "export",
            "Properties": {
               <Env Variables Removed>
            }
          }
        ]
      },
      {
        "Classification": "spark-log4j",
        "Properties": {
          "log4j.rootCategory": "INFO, console",
          "log4j.logger.com.tmobile": "DEBUG",
          "log4j.appender.console.target": "System.err",
          "log4j.appender.console": "org.apache.log4j.ConsoleAppender",
          "log4j.appender.console.layout": "org.apache.log4j.EnhancedPatternLayout",
          "log4j.appender.console.layout.ConversionPattern": "%d{yyyy/MM/dd HH:mm:ss} [%10.10t] %-5p %-30.30c: %m%n",
          "log4j.logger.com.amazonaws.latency": "WARN",
          "log4j.logger.org": "WARN"
        }
      },
      {
        "Classification": "yarn-site",
        "Properties": {
          "yarn.nodemanager.pmem-check-enabled": "false",
          "yarn.nodemanager.vmem-check-enabled": "false"
        }
      }
    ]

Solution

  • This turned out to be a matter of persisting to StorageLevel.DISK_ONLY while using EBS storage. The time involved to cache increased about 10x with the move from EMR 5.17 to 5.36.1 (spark 2.3.1 to 2.4.8) while using memory optimized EC2 instances (r5.24xlarge) with EBS storage.

    My solution was move to storage optimized instances (i4i.32xlarge), which have SSD arrays built in with much faster write speeds.

    My jobs run faster now, with the faster caching speeds... but the instances are about 2x more expensive, so overall my cost is up 40%.