Search code examples
scalaapache-sparkapache-spark-sqlcluster-computingparquet

Serialization issues when connecting to Spark cluster


I have a Spark app written in Scala that is writing and reading from Parquet files. The app exposes an HTTP API, and when it receives requests, sends work to a Spark cluster through a long-lived context that is persisted through the app's life. It then returns the results to the HTTP client.

This all works fine when I'm using a local mode, with local[*] as master. However, as soon as I'm trying to connect to a Spark cluster, I'm running into serialization issues. With Spark's default serializer, I get the following:

java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.sql.execution.FilterExec.otherPreds of type scala.collection.Seq in instance of org.apache.spark.sql.execution.FilterExec.

If I enable Kryo serializer, I get java.lang.IllegalStateException: unread block data.

This happens when trying to read from the Parquet files, however I don't believe it has anything to do with the Parquet files themselves, simply with the serialization of the code that's being sent over to the Spark cluster.

From a lot of internet searches, I've gathered that this could be caused by incompatibilities between Spark versions, or even Java versions. But the versions being used are identical.

The app is written in Scala 2.12.8 and ships with Spark 2.4.3. The Spark cluster is running Spark 2.4.3 (the version compiled with Scala 2.12). And the machine on which both the Spark cluster and the app are running is using openJDK 1.8.0_212.

According to another internet search, the problem could have been because of a mismatch in the spark.master URL. So I've set spark.master in spark-defaults.conf to the same value I'm using within the app to connect to it.

However, this hasn't solved the issue and I am now running out of ideas.


Solution

  • I am not entirely sure what the underlying explanation is, but I fixed it by copying my application's jar into Spark's jars directory. Then I was still encountering an error, but a different one: something about a Cats/kernel/Eq class missing. So I added cats-kernel's jar into Spark's jars directory.

    And now everything works. Something I read in another Stack Overflow thread may explain it:

    I think that whenever you do any kind of map operation using a lambda which is referring to methods/classes of your project, you need to supply them as an additional jar. Spark does serializes the lambda itself, but is not pulling together its dependencies. Not sure why the error message is not informative at all.