Search code examples
apache-sparkapache-spark-sqlsparkr

sparkR gapply SLOW compared with SQL


I have a data set of ~8 GB with ~10 million rows (about 10 columns) and wanted to prove the point that SparkR could outperform SQL. To the contrary, I see extremely poor performance from SparkR compared with SQL.

My code simply loads the file from S3 the runs gapply, where my groupings will typically consist of 1-15 rows -- so 10 million rows divided by 15 gives a lot of groups. Am I forcing too much shuffling, serialization/deserialization? Is that why things run so slowly?

For purposes of illustrating that my build_transition function is not the performance bottleneck, I created a trivial version called build_transition2 as shown below, which returns dummy information with what should be constant execution time per group.

Anything fundamental or obvious with my solution formulation?

build_transition2 <- function(key, x) {

  patient_id <- integer()
  seq_val <- integer()

  patient_id <- append(patient_id, as.integer(1234))
  seq_val <- append(seq_val, as.integer(5678))

  y <- data.frame(patient_id, 
                  seq_val,
                  stringsAsFactors = FALSE
                  )
}


dat_spark <- read.df("s3n://my-awss3/data/myfile.csv", "csv", header = "true", inferSchema = "true", na.strings = "NA")


schema <- structType(structField("patient_ID","integer"),
                     structField("sequence","integer")
                     )


result <- gapply(dat_spark, "patient_encrypted_id", build_transition2, schema)

Solution

  • and wanted to prove the point that SparkR could outperform SQL.

    That's just not the case. The overhead of indirection caused by the guest language:

    • Internal Catalyst format
    • External Java type
    • Sending data to R
    • ....
    • Sending data back to JVM
    • Converting to Catalyst format

    is huge.

    On to of that, gapply is basically an example of group-by-key - something that we normally avoid in Spark.

    Overall gapply should be used if, and only if, business logic cannot be expressed using standard SQL functions. It is definitely not a way to optimize your code under normal circumstances (there might border cases where it might be faster, but in general any special logic, if required, will benefit more from native JVM execution with Scala UDF, UDAF, Aggregator, or reduceGroups / mapGroups).