Search code examples
scalaapache-sparkapache-kafkaspark-structured-streaming

Deserializing structured stream from kafka with Spark


I try to read a kafka topic from Spark 3.0.2 with the scala code below.

Here are my imports:

import io.confluent.kafka.schemaregistry.client.{CachedSchemaRegistryClient, SchemaRegistryClient}
import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.commons.cli.CommandLine
import org.apache.spark.sql._
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.streaming.OutputMode

Functions to retreive schema from a Schema Registry service and to convert it into spark format :

var schemaRegistryClient: SchemaRegistryClient = _
var kafkaAvroDeserializer: AvroDeserializer = _

def lookupTopicSchema(topic: String): String = {
  schemaRegistryClient.getLatestSchemaMetadata(topic).getSchema
}

def avroSchemaToSparkSchema(avroSchema: String): SchemaConverters.SchemaType = {
  SchemaConverters.toSqlType(new Schema.Parser().parse(avroSchema))
}

Definition of class to deserialize data from the kafka topic :

object DeserializerWrapper {
  val deserializer: AvroDeserializer = kafkaAvroDeserializer
}

class AvroDeserializer extends AbstractKafkaAvroDeserializer {
  def this(client: SchemaRegistryClient) {
    this()
    this.schemaRegistry = client
  }

  override def deserialize(bytes: Array[Byte]): String = {
    val value = super.deserialize(bytes)
    Option(value) match {
      case Some(Array()) | None => null
      case Some(a) =>  
        val genericRecord = a.asInstanceOf[GenericRecord]
        genericRecord.toString
    }
  }
}

The main function to execute the job :

def main(): Unit = {

  val spark = SparkSession.builder.getOrCreate

  val bootstrapServers = "kafka1:9192"
  val topic = "sample_topic"
  val shemaRegistryName = "avro_schema_A"
  val schemaRegistryUrl = "http://myHost:8001"

  schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 128)
  val kafkaAvroDeserializer = new AvroDeserializer(schemaRegistryClient)

  spark.udf.register("deserialize", (bytes: Array[Byte]) => DeserializerWrapper.deserializer.deserialize(bytes))

  val kafkaDataFrame = (spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", bootstrapServers)
    .option("subscribe", topic)
    .option("kafka.group.id", "group_id")
    .option("startingOffsets", "latest")
    .load()
  )

  val valueDataFrame = kafkaDataFrame.selectExpr("""deserialize(value) AS message""")

  import org.apache.spark.sql.functions._

  val dfValueSchema = {
    val rawSchema = lookupTopicSchema(shemaRegistryName)
    avroSchemaToSparkSchema(rawSchema)
  }

  val formattedDataFrame = (valueDataFrame
    .select(from_json(col("message"), dfValueSchema.dataType).alias("parsed_value"))
    .select("parsed_value.*")
  )

  (formattedDataFrame
    .writeStream
    .format("console")
    .outputMode(OutputMode.Append())
    .option("truncate", false)
    .start()
    .awaitTermination()
  )
}

When I execute the main function, the job crashes with the error :

-------------------------------------------
Batch: 0
-------------------------------------------
+-----+-----+-----+-----+-----+-----+-----+
|col-A|col-B|col-C|col-D|col-E|col-F|col-G|
+-----+-----+-----+-----+-----+-----+-----+
+-----+-----+-----+-----+-----+-----+-----+

21/07/23 15:41:31 ERROR TaskSetManager: Task 0 in stage 3.0 failed 4 times; aborting job
21/07/23 15:41:31 ERROR WriteToDataSourceV2Exec: Data source write support org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@227770fb is aborting.
21/07/23 15:41:31 ERROR WriteToDataSourceV2Exec: Data source write support org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@227770fb aborted.
21/07/23 15:41:31 ERROR MicroBatchExecution: Query [id = 5c76f495-b2c8-46e7-90b0-dd715c8466f3, runId = c38ccde6-3c98-488c-bfd9-a71a17296503] terminated with error
org.apache.spark.SparkException: Writing job aborted.
        at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:413)
        at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:361)
        at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.writeWithV2(WriteToDataSourceV2Exec.scala:322)
        at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.run(WriteToDataSourceV2Exec.scala:329)
        at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:39)
        at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:39)
        at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:45)
        at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3627)
        at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2940)
        at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616)
        at org.apache.spark.sql.Dataset.collect(Dataset.scala:2940)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:586)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:581)
        at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352)
        at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350)
        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:581)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:223)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352)
        at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350)
        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:191)
        at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:185)
        at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:334)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:245)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3.0 (TID 13, 172.20.0.4, executor 1): org.apache.spark.SparkException: Failed to execute user defined function($read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$Lambda$1119/1813496428: (binary) => string)
        at org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1130)
        at org.apache.spark.sql.catalyst.expressions.UnaryExpression.eval(Expression.scala:457)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.subExpr_0$(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
        at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$7(WriteToDataSourceV2Exec.scala:441)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
        at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:477)
        at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:385)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:127)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:462)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:465)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
        at $line55.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.$anonfun$res3$1(<console>:41)
        at org.apache.spark.sql.catalyst.expressions.ScalaUDF.$anonfun$f$2(ScalaUDF.scala:157)
        at org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1127)
        ... 17 more

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
        at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
        at scala.Option.foreach(Option.scala:407)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
        at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:382)
        ... 37 more
Caused by: org.apache.spark.SparkException: Failed to execute user defined function($read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$Lambda$1119/1813496428: (binary) => string)
        at org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1130)
        at org.apache.spark.sql.catalyst.expressions.UnaryExpression.eval(Expression.scala:457)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.subExpr_0$(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
        at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$7(WriteToDataSourceV2Exec.scala:441)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
        at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:477)
        at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:385)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:127)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:462)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:465)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
        at $line55.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.$anonfun$res3$1(<console>:41)
        at org.apache.spark.sql.catalyst.expressions.ScalaUDF.$anonfun$f$2(ScalaUDF.scala:157)
        at org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1127)
        ... 17 more

It seems that the udf throws a NullPointerException, but I explicity implemented a pattern matching to avoid it.

Can anybody please help on this ? Thanks a lot.


Solution

  • I finally come over the deserialization process using the ABRiS library. It is way to easy to make connection with the Schema registry and do Ser/Deser operations of the kafka stream.

    @OneCricketeer I really appreciate your help on this.

    For information, the connection to the SR can be made simply as :

    import za.co.absa.abris.config.AbrisConfig
    val abrisConfig = AbrisConfig
        .fromConfluentAvro
        .downloadReaderSchemaById(123)
        .usingSchemaRegistry("http://schema-registry.com:8081")
    

    Then, the conversion process is made with a single line of code :

    import za.co.absa.abris.avro.functions.from_avro
    val outputDf = inputStreamDf.select(from_avro(col("value"), abrisConfig) as "data")