Search code examples
scalaapache-sparkapache-kafkaspark-structured-streaming

enocder issue- Spark Structured streaming- works in repl only


I have a working process to ingest and deserialize kafka avro message using schema reg. It works great in the REPL but when I try to compile I get

Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.
[error]       .map(x => {

I'm not sure if I need to modify my object, but why would I need to if the REPL works fine.

object AgentDeserializerWrapper {
      val props = new Properties()
      props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryURL)
      props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true")
      val vProps = new kafka.utils.VerifiableProperties(props)
      val deser = new KafkaAvroDecoder(vProps)
      val avro_schema = new RestService(schemaRegistryURL).getLatestVersion(subjectValueNameAgentRead)
      val messageSchema = new Schema.Parser().parse(avro_schema.getSchema)
    }

    case class DeserializedFromKafkaRecord( value: String)

    import spark.implicits._

    val agentStringDF = spark
      .readStream
      .format("kafka")
      .option("subscribe", "agent")
      .options(kafkaParams)
      .load()
      .map(x => {
        DeserializedFromKafkaRecord(AgentDeserializerWrapper.deser.fromBytes(x.getAs[Array[Byte]]("value"), AgentDeserializerWrapper.messageSchema).asInstanceOf[GenericData.Record].toString)
      })

Solution

  • Add as[DeserializedFromKafkaRecord], in order to type statically your dataset:

    val agentStringDF = spark
          .readStream
          .format("kafka")
          .option("subscribe", "agent")
          .options(kafkaParams)
          .load()
          .as[DeserializedFromKafkaRecord]
          .map(x => {
            DeserializedFromKafkaRecord(AgentDeserializerWrapper.deser.fromBytes(x.getAs[Array[Byte]]("value"), AgentDeserializerWrapper.messageSchema).asInstanceOf[GenericData.Record].toString)
          })