Search code examples
apache-sparkspark-streamingrddavrokryo

Serialization of an object used in foreachRDD() when CheckPointing


According to this question and documentations I've read, Spark Streaming's foreachRDD(someFunction) will have someFunction itself executed in the driver process ONLY, though if there were operations done on RDDs then they will be done on the executors - where the RDDs sit.

All above works for me as well, though I noticed that if I turn on checkpointing, then it seems like spark is trying to serialize everything in foreachRDD(someFunction) and send to somewhere - which is causing issue for me because one of the object used is not serializable (namely schemaRegistryClient). I tried Kryo serializer but also no luck.

The serialization issue goes away if I turn checkpointing off.

Is there a way to let Spark not to serialize what's used in foreachRDD(someFunc) while also keep using checkpointing?

Thanks a lot.


Solution

  • Is there a way to let Spark not to serialize what's used in foreachRDD(someFunc) while also keep using checkpointing?

    Checkpointing shouldn't have anything to do with your problem. The underlying issue is the fact that you have a non serializable object instance which needs to be sent to your workers.

    There is a general pattern to use in Spark when you have such a dependency. You create an object with a lazy transient property which will load inside the worker nodes when needed:

    object RegisteryWrapper {
      @transient lazy val schemaClient: SchemaRegisteryClient = new SchemaRegisteryClient()
    }
    

    And when you need to use it inside foreachRDD:

    someStream.foreachRDD { 
       rdd => rdd.foreachPartition { iterator => 
           val schemaClient = RegisteryWrapper.schemaClient
           iterator.foreach(schemaClient.send(_))
      }
    }