Search code examples
apache-sparkserializationapache-spark-sqlapache-spark-encoders

How is using encoders so much faster than java serialization?


How is using encoders so much faster than java and kryo serialization?


Solution

    • Because Encoders sacrifice generality for performance. The idea is not new. Why Kryo is faster than Java serialization? For the same reason. Consider this transcript:

      scala> val spark = SparkSession.builder.config("spark.serializer", "org.apache.spark.serializer.JavaSerializer").getOrCreate()
      spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@1ed28f57
      
      scala> val map = Map[String, Int]("foo" -> 1).withDefaultValue(0)
      map: scala.collection.immutable.Map[String,Int] = Map(foo -> 1)
      
      scala> map("bar")
      res1: Int = 0
      
      scala> val mapSerDe = spark.sparkContext.parallelize(Seq(map)).first
      mapSerDe: scala.collection.immutable.Map[String,Int] = Map(foo -> 1)
      
      scala> mapSerDe("bar")
      res2: Int = 0
      

      compared to

      scala> val spark = SparkSession.builder.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").getOrCreate()
      spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@5cef3456
      
      scala>  val map = Map[String, Int]("foo" -> 1).withDefaultValue(0)
      map: scala.collection.immutable.Map[String,Int] = Map(foo -> 1)
      
      scala> map("bar")
      res7: Int = 0
      
      scala>  val mapSerDe = spark.sparkContext.parallelize(Seq(map)).first
      mapSerDe: scala.collection.immutable.Map[String,Int] = Map(foo -> 1)
      
      scala> mapSerDe("bar")
      java.util.NoSuchElementException: key not found: bar
        at scala.collection.MapLike$class.default(MapLike.scala:228)
        at scala.collection.AbstractMap.default(Map.scala:59)
        at scala.collection.MapLike$class.apply(MapLike.scala:141)
        at scala.collection.AbstractMap.apply(Map.scala:59)
        ... 48 elided
      

      (I couldn't find the exact post, but the idea of this example comes from Developers list).

      As you can see, Kryo, although faster, doesn't handle all possible cases. It focuses on the most common ones, and does it right.

      Spark Encoders do the same, but are even less general. If you support only 16 types or so, and don't care about interoperability (must have with real serialization libraries), you have a lot of opportunity to optimize.

    • No need for interoperability allows you to move even further. Encoders for atomic types are just identity. There is no need for any transformations at all.

    • Knowing the schema, as explained by himanshuIIITian is another factor.

      Why does it matter? Because having well defined shape, allows you to optimize serialization and storage. If you know that your data is structured you can switch dimensions - instead of having heterogeneous rows which are expensive to store and access you can use columnar storage.

      Once data is stored in columns you open a whole new set of optimization opportunities:

      • Data access on fixed size fields is extremely fast because you can directly access specific address (remember all the excitement about off-heap / native memory / Tungsten?).
      • You can use a wide range of compression and encoding techniques to minimize the size of the data.

      This ideas are not new either. Columnar databases, storage formats (like Parquet) or modern serialization formats designed for analytics (like Arrow) use the same ideas and often pushed these even further (zero copy data sharing).

    • Unfortunately Encoders are not a silver bullet. Storing non-standard object is a mess, collection Encoders can be very inefficient.