Search code examples
scalaapache-kafkadeserializationavrospark-structured-streaming

spark scala kafka avro deserializer


I am consuming a streaming dataframe from kafka. The data in value column is in avro. I want to deserialize the data into struct type.

Currently I have written a udf function which calls the method public Object deserialize(String topic, byte[] bytes) present in KafkaAvroDeserializer. I am calling this udf function on the value column of the streaming dataframe. The udf function successfully returns the json string.

I want that instead of the json string I should get struct type in value column. For example now I am getting {"name": "ABC", "age": 17} instead I want value column to be of struct type so that I can write value.name or value.age.

Problem is that I do not have a case class or a sample json or a schema file to convert the json string into struct type. The same code can be used for different topics which have different data.

Any guidance will be very helpful. Please let me know if I am thinking in the wrong way or doing the deserialization in the wrong way.

udf function code

  val kafkaAvroDeserializer = new KafkaAvroDeserializer()
  private val kafkaAvroDeserializerConfig: Map[String, Any] = Map(
    "schema.registry.url" -> //url for schema registry
  )
  kafkaAvroDeserializer.configure(kafkaAvroDeserializerConfig.asJava, false)

  val deserializationFunction: UserDefinedFunction = udf((input: Array[Byte]) => {
      val genericRecord = kafkaAvroDeserializer.deserialize("topic", input)
        .asInstanceOf[GenericRecord]
      genericRecord.toString // currently returns json string successfully
 }

utilising the udf method (Data is a simple case class with key: String and value: String)

      streamingDF.selectExpr("key", "value")
        .as[Data]
        .select(col("key"), deserializationFunction(col("value")).as("value"))

This is not a duplicate question. I got a prompt that if the other question was helpful in answering my question. I clicked on yes because it gave me more insight. I had no idea it would close my question and mark it as duplicate. I had replied same in comments earlier. I have gone through the link. The solutions provided almost work as expected. There are a couple of small issues. The first solution is showing deprecated in my ide. The second solution does not include other columns like key, topic, offset. Both the solutions do not work on streaming dataframe unless I use foreachbatch. Honestly I was expecting a solution around the udf function.


Solution

  • You now have a JSON string, so Avro isn't really relevant.

    You can use get_json_object with a JsonPath expression.

    Otherwise, plenty of options shown here Integrating Spark Structured Streaming with the Confluent Schema Registry