Search code examples
scalaapache-sparkspark-streaming

Task not serializable after adding it to ForEachPartition


I am receiving a task not serializable exception in spark when attempting to implement an Apache pulsar Sink in spark structured streaming.

I have already attempted to extrapolate the PulsarConfig to a separate class and call this within the .foreachPartition lambda function which I normally do for JDBC connections and other systems I integrate into spark structured streaming like shown below:

PulsarSink Class

class PulsarSink(
                sqlContext: SQLContext,
                parameters: Map[String, String],
                partitionColumns: Seq[String],
                outputMode: OutputMode)  extends Sink{

  override def addBatch(batchId: Long, data: DataFrame): Unit = {
    data.toJSON.foreachPartition( partition => {
      val pulsarConfig = new PulsarConfig(parameters).client
      val producer = pulsarConfig.newProducer(Schema.STRING)
        .topic(parameters.get("topic").get)
        .compressionType(CompressionType.LZ4)
        .sendTimeout(0, TimeUnit.SECONDS)
        .create
      partition.foreach(rec => producer.send(rec))
      producer.flush()
    })
  }

PulsarConfig Class

class PulsarConfig(parameters: Map[String, String]) {

  def client(): PulsarClient = {
    import scala.collection.JavaConverters._

    if(!parameters.get("tlscert").isEmpty && !parameters.get("tlskey").isEmpty) {
      val tlsAuthMap = Map("tlsCertFile" -> parameters.get("tlscert").get,
        "tlsKeyFile" -> parameters.get("tlskey").get).asJava

      val tlsAuth: Authentication = AuthenticationFactory.create(classOf[AuthenticationTls].getName, tlsAuthMap)
      PulsarClient.builder
        .serviceUrl(parameters.get("broker").get)
        .tlsTrustCertsFilePath(parameters.get("tlscert").get)
        .authentication(tlsAuth)
        .enableTlsHostnameVerification(false)
        .allowTlsInsecureConnection(true)
        .build
    }
    else{
      PulsarClient.builder
        .serviceUrl(parameters.get("broker").get)
        .enableTlsHostnameVerification(false)
        .allowTlsInsecureConnection(true)
        .build
    }
  }
}

The error message I receive is the following:

ERROR StreamExecution: Query [id = 12c715c2-2d62-4523-a37a-4555995ccb74, runId = d409c0db-7078-4654-b0ce-96e46dfb322c] terminated with error
org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:340)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:330)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:156)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2294)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:925)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:924)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
    at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:924)
    at org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.apply$mcV$sp(Dataset.scala:2341)
    at org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.apply(Dataset.scala:2341)
    at org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.apply(Dataset.scala:2341)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
    at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2828)
    at org.apache.spark.sql.Dataset.foreachPartition(Dataset.scala:2340)
    at org.apache.spark.datamediation.impl.sink.PulsarSink.addBatch(PulsarSink.scala:20)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply$mcV$sp(StreamExecution.scala:666)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply(StreamExecution.scala:666)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply(StreamExecution.scala:666)
    at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:279)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch(StreamExecution.scala:665)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(StreamExecution.scala:306)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply(StreamExecution.scala:294)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply(StreamExecution.scala:294)
    at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:279)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:294)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:290)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:206)
Caused by: java.io.NotSerializableException: org.apache.spark.datamediation.impl.sink.PulsarSink
Serialization stack:
    - object not serializable (class: org.apache.spark.datamediation.impl.sink.PulsarSink, value: org.apache.spark.datamediation.impl.sink.PulsarSink@38813f43)
    - field (class: org.apache.spark.datamediation.impl.sink.PulsarSink$$anonfun$addBatch$1, name: $outer, type: class org.apache.spark.datamediation.impl.sink.PulsarSink)
    - object (class org.apache.spark.datamediation.impl.sink.PulsarSink$$anonfun$addBatch$1, <function1>)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:337)
    ... 31 more

Solution

  • Values used in "foreachPartition" can be reassigned from class level to function variables:

    override def addBatch(batchId: Long, data: DataFrame): Unit = {
      val parametersLocal = parameters
      data.toJSON.foreachPartition( partition => {
        val pulsarConfig = new PulsarConfig(parametersLocal).client