I understand that avro serialized data has the 0 byte as magic byte and bytes 1-4 have the schema ID. This is a very important requirement for me but I'm not able to extract the schema id out using spark functions.
Can someone help me out with the equivalent code in scala?
From plain Scala KafkaConsumer, you'd use ByteArrayDesrializer in the consumer config, build a ByteBuffer from the consumed value, then call get
(to discard the magic byte) and then getInt
to fetch the ID. But that's not necessary in Spark; Spark always returns binary types for Kafka Dataframes and doesn't accept custom Kafka deserializer classes.
Refer this blog on using substring()
function on byte array to get the ID , and further how to actually use that with the Registry
Example in PySpark
import pyspark.sql.functions as fn
from pyspark.sql.types import StringType
binary_to_string = fn.udf(lambda x: str(int.from_bytes(x, byteorder='big')), StringType())
kafka_df = spark.readStream.format("kafka")\
...
.withColumn('valueSchemaId', binary_to_string(fn.expr("substring(value, 2, 4)")))