I'm using python to read messages coming from various topics. Some topics have got their messages encoded in plain JSON, while others are using Avro binary serialization, with confluent schema registry.
When I receive a message I need to know if it has to be decoded. At the moment I'm only relying on the fact that the binary encoded messages are starting with a MAGIC_BYTE
which value is zero:
from confluent_kafka.cimpl import Consumer
consumer = Consumer(config)
consumer.subsrcibe(...)
msg = consumer.poll()
# check the msg is not null or error etc
if msg.values()[0] == 0:
# It is binary encoded
else:
# It is json
I was wondering is there's a better way to do that?
You could get bytes 0-5
of your message, then
magic_byte = message_bytes[0]
schema_id = message_bytes[1:5]
Then, perform a lookup against your registry for GET /schemas/{schema_id}
, and cache the ID + schema (if needed) when you get a 200
response code.
Otherwise, the message is either JSON, or the producer had sent its data to a different registry (if there is more than one in your environment). Note: this means the data could still be Avro