Search code examples
dataframescalaapache-sparkserialization

Task not Serializable immutable.MapLike$$anon$2 - Scala


I have a question on how two seemingly identical structures in Scala can yield different results on attempted Serialization.

My issue is part of a bigger problem related to Spark Dataframe creation but I have managed to isolate the issue to a nested structure that is converted into Row objects being not serializable.

I used this following block of code to discover the route of my problems but even with that I have no idea why there is a problem.

  try {
    val byteOut = new ByteArrayOutputStream()
    val objOut = new ObjectOutputStream(byteOut)
    objOut.writeObject(p)
    objOut.close()
    sys.error("Serialization successful")
  } catch {
    case e: Exception => e.printStackTrace()
  }

working p:

Record(
    Map(
        language -> Project(Variable(x1,RecordCType(Map(language -> StringType, info -> BagCType(RecordCType(Map(users -> StringType, difficulty -> IntType, average_review -> DoubleType)))))),language), 

        info -> Project(Variable(x1,RecordCType(Map(language -> StringType, info -> BagCType(RecordCType(Map(users -> StringType, difficulty -> IntType, average_review -> DoubleType)))))),info)))

non serializable p:

Record(
    Map(
        lName -> Project(Variable(x5,RecordCType(Map(s1_index -> LongType, nested -> BagCType(RecordCType(Map(users -> IntType, inUse -> BoolType))), userNo -> IntType, lName -> StringType))),lName), 
        
        nested -> Project(Variable(x5,RecordCType(Map(s1_index -> LongType, nested -> BagCType(RecordCType(Map(users -> IntType, inUse -> BoolType))), userNo -> IntType, lName -> StringType))),nested)))

I'm aware there are some small differences between the Records but in terms of structure they are more or less identical. The stack trace points to:

    - object not serializable (class: scala.collection.immutable.MapLike$$anon$2, value: Map(users -> IntType, inUse -> BoolType))

But if that is not serializable in p2 then why is:

Map(users -> StringType, difficulty -> IntType, average_review -> DoubleType))

able to be serialized with no issues in p1, to me there is no difference.

I've added some classes below hat are used in the question to give context:

case class Record(fields: Map[String, CExpr]) extends CExpr {
  
  def tp: RecordCType = {
    RecordCType(fields.map(f => f._1 -> f._2.tp))
  }
}

case class Project(e1: CExpr, field: String) extends CExpr

case class Variable(name: String, override val tp: Type) extends CExpr

final case class BagCType(tp: Type) extends Type

final case class RecordCType(attrTps: Map[String, Type]) extends Type

sealed trait Type extends Serializable 

trait CExpr { self =>

  def tp: Type
  def vstr: String = self.toString
  val isCacheUnfriendly: Boolean = false

}

I must be missing something /misunderstanding something about serialization in Scala. Can anyone see my issue or even point me in the right direction?


Solution

  • You've hit https://github.com/scala/bug/issues/7005 and as per the Spark related comments there you are likely using a view without knowing it. Using Seq can sometimes produce underlying Streams that suffer the same kind of issue.

    The "solution" is, for any map usage having this issue call .map(identity) to force a real map to be created. (for Seq call .toVector to make sure)