Search code examples
apache-sparkapache-kafkaapache-spark-sqlconfluent-schema-registry

How to extract schema id from confluent serialized avro payload in Scala


I understand that avro serialized data has the 0 byte as magic byte and bytes 1-4 have the schema ID. This is a very important requirement for me but I'm not able to extract the schema id out using spark functions.

Can someone help me out with the equivalent code in scala?


Solution

  • From plain Scala KafkaConsumer, you'd use ByteArrayDesrializer in the consumer config, build a ByteBuffer from the consumed value, then call get (to discard the magic byte) and then getInt to fetch the ID. But that's not necessary in Spark; Spark always returns binary types for Kafka Dataframes and doesn't accept custom Kafka deserializer classes.

    Refer this blog on using substring() function on byte array to get the ID , and further how to actually use that with the Registry

    Example in PySpark

    import pyspark.sql.functions as fn
    from pyspark.sql.types import StringType
    
    binary_to_string = fn.udf(lambda x: str(int.from_bytes(x, byteorder='big')), StringType())
    
    kafka_df = spark.readStream.format("kafka")\
      ...
      .withColumn('valueSchemaId', binary_to_string(fn.expr("substring(value, 2, 4)")))