Search code examples
pythonapache-kafkaavroconfluent-schema-registry

Differentiating between binary encoded Avro and JSON messages


I'm using python to read messages coming from various topics. Some topics have got their messages encoded in plain JSON, while others are using Avro binary serialization, with confluent schema registry.

When I receive a message I need to know if it has to be decoded. At the moment I'm only relying on the fact that the binary encoded messages are starting with a MAGIC_BYTE which value is zero:

from confluent_kafka.cimpl import Consumer

consumer = Consumer(config)
consumer.subsrcibe(...)
msg = consumer.poll()
# check the msg is not null or error etc
if msg.values()[0] == 0:
      # It is binary encoded
else:
      # It is json

I was wondering is there's a better way to do that?


Solution

  • You could get bytes 0-5 of your message, then

    magic_byte = message_bytes[0]
    schema_id = message_bytes[1:5]
    

    Then, perform a lookup against your registry for GET /schemas/{schema_id}, and cache the ID + schema (if needed) when you get a 200 response code.

    Otherwise, the message is either JSON, or the producer had sent its data to a different registry (if there is more than one in your environment). Note: this means the data could still be Avro