Search code examples
scalaapache-sparkstack-overflowpysparkpartialfunction

Scala PartialFunction Stackoverflow


I'm developing a Scala / Python library called PySpark Cassandra. In it I have to deal with objects serialized Python objects in the pickle format when e.g. saving data.

I have a job which fails with a stackoverlfow:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 250 in stage 2.0 failed 4 times, most recent failure: Lost task 250.3 in stage 2.0 (TID 411, sp-prod-adg02.priv.tgho.nl): java.lang.StackOverflowError
        at pyspark_cassandra.UnpickledUUIDConverter$$anonfun$convertPF$1.applyOrElse(Pickling.scala:121)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:165)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:166)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:166)
        ...
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:166)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:166)

The code which starts this trace is:

object UnpickledUUIDConverter extends TypeConverter[UUID] {
  val tt = typeTag[UUID]
  def targetTypeTag = tt
  def convertPF = { case holder: UUIDHolder => holder.uuid }
}

located at https://github.com/TargetHolding/pyspark-cassandra/blob/master/src/main/scala/pyspark_cassandra/Pickling.scala#L118 (for more details and context).

The UUIDHolder class is defined as:

class UUIDHolder {
  var uuid: UUID = null

  def __setstate__(values: HashMap[String, Object]): UUID = {
    val i = values.get("int").asInstanceOf[BigInteger]
    val buffer = ByteBuffer.wrap(i.toByteArray())
    uuid = new UUID(buffer.getLong(), buffer.getLong())
    uuid
  }
}

(the weird construct of this class is for compatibility with py4j and how UUID objects are pickled by Python)

But my understanding of Scala and the relationship between case blocks and PartialFunctions is rather limited. Especially how my case block relates to https://github.com/scala/scala/blob/2.10.x/src/library/scala/PartialFunction.scala#L166 (I'm running on Scala 2.10.5)

To worsen my situation :) I have difficulty reproducing the error consistently. It occurs in a Spark job on different nodes, but not al the time. I have a dataset for which the issue exists when saving that dataset. But I can't pin it to a particular record in the dataset.

I would not expect a StackOverflow in any case with this code. Any help would be very much appreciated!


Solution

  • To answer the simple bits:

    Your case block is a partial function literal as explained here, aka a pattern-matching anonymous function. That works because the return type of convertPF is a partial function.

    It gets an applyOrElse as described here, which avoids invoking ifDefined and then apply.

    The OrElse on your stack is just what wraps pf1 orElse pf2. Its implementation of applyOrElse delegates to each PartialFunction.

    A very long chain of pfi orElse pfi_++ could overflow the stack on evaluation, or orElse_i orElse (orElse_i++ orElse ...).