Search code examples
scalaapache-sparkapache-kafkaspark-streamingavro

Deserialising Avro formatted data from Kafka in Spark Streaming gives empty String and 0 for long


I'm struggling to deserialise Avro serialised data coming off Kafka in Spark Streaming.

This is the file I am running through spark-submit:

package com.example.mymessage

import org.apache.avro.Schema
import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
import org.apache.avro.io.DecoderFactory
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._

object MyMessageCount extends Logging {
  def main(args: Array[String]) {
    if (args.length < 4) {
      System.err.println("Usage: MyMessageCount <zkQuorum> <group> <topics> <numThreads>")
      System.exit(1)
    }

    val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
    if (!log4jInitialized) {
      logInfo("Setting log level to [WARN]." +
        " To override add a custom log4j.properties to the classpath.")
      Logger.getRootLogger.setLevel(Level.WARN)
    }

    val Array(zkQuorum, group, topics, numThreads) = args
    val sparkConf = new SparkConf().setMaster("local[4]").setAppName("MyMessageCount")
    val ssc = new StreamingContext(sparkConf, Seconds(2))
    ssc.checkpoint("checkpoint")

    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
    val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)

    lines.foreachRDD(rdd => {
      rdd.foreach(avroRecord => {
        val schemaString = "{\"type\":\"record\",\"name\":\"myrecord\",\"fields\":[{\"name\":\"string\",\"type\":\"string\"},{\"name\":\"long\",\"type\":\"long\"}]}"
        val parser = new Schema.Parser()
        val schema = parser.parse(schemaString)
        val reader = new GenericDatumReader[GenericRecord](schema)

        val decoder = DecoderFactory.get.binaryDecoder(avroRecord.toCharArray.map(_.toByte), null)
        val record: GenericRecord = reader.read(null, decoder)

        System.out.println(avroRecord + "," + record.toString 
          + ", string= " + record.get("string")
          + ", long=" + record.get("long"))
      })
    })

    ssc.start()
    ssc.awaitTermination()
  }
}

I have been using the Confluent platform to send it data locally.

If I send:

{"string":"test","long":30}

Then the above code outputs:

test<,{"string": "", "long": 0}, string= , long=0

This suggests to me that the data is coming through but for some reason the string and long values come out as values that look like defaults. How can I access the true "string" and "long" values coming into avroRecord from Kafka?


Solution

  • Using Confluent's KafkaAvroDecoder with a direct stream worked for this.

    import io.confluent.kafka.serializers.KafkaAvroDecoder
    
    ...
    
    val kafkaParams = Map[String, String]("metadata.broker.list" -> zkQuorum,
      "schema.registry.url" -> schemaRegistry,
      "auto.offset.reset" -> "smallest")
    val topicSet = Set(topics)
    val messages = KafkaUtils.createDirectStream[Object, Object, KafkaAvroDecoder, KafkaAvroDecoder](ssc, kafkaParams, topicSet).map(_._2)
    
    val lines = messages.foreachRDD(rdd => {
      rdd.foreach({ avroRecord =>
        println(avroRecord)
      })
    })
    

    I found a separate issue that I could only import version 1 and not more recent versions.

    libraryDependencies ++= Seq(
      "io.confluent" % "kafka-avro-serializer" % "1.0",
      ...
    )
    
    resolvers ++= Seq(
      Resolver.sonatypeRepo("public"),
      Resolver.url("confluent", url("http://packages.confluent.io/maven/"))
    )
    

    UPDATE The following worked to get the latest version of kafka-avro-serializer.

    libraryDependencies ++= Seq(
      "io.confluent" % "kafka-avro-serializer" % "3.0.0",
      ...
    )
    
    resolvers ++= Seq(
      Resolver.sonatypeRepo("public"),
      "Confluent Maven Repo" at "http://packages.confluent.io/maven/"
    )