Search code examples
scalaapache-kafkaavrobijection

Serializing generic avro records as an Array[Byte] keeps the schema in the object


Situation

I'm currently writing a consumer/producer using AVRO and a schema repository.

From what I gather My options for serializing this data is either use the Confluent's avro serializer, or go with Twitter's Bijection.

It seemed Bijection looked the most straightforward.

So I want to produce date in the following format ProducerRecord[String,Array[Byte]], this comes down to [some string ID, serialized GenericRecord]

(note: I'm going for Generic records as this codebase has to handle thousands of schema's that get parsed from Json/csv/...)

Question:

The whole reason I serialize and use AVRO, is that you don't need to have a schema in the data itself (like you would with Json/XML/...).
When checking the data in the topic however, I see the whole scheme is contained together with the data. Am I doing something fundamentally wrong, is this by design, or should I use the confluent serializer instead?

Code:

  def jsonStringToAvro(jString: String, schema: Schema): GenericRecord = {
    val converter = new JsonAvroConverter
    val genericRecord = converter.convertToGenericDataRecord(jString.replaceAll("\\\\/","_").getBytes(), schema)

    genericRecord
  }
def serializeAsByteArray(avroRecord: GenericRecord): Array[Byte] = {
    //val genericRecordInjection = GenericAvroCodecs.toBinary(avroRecord.getSchema)
    val r: Array[Byte] = GenericAvroCodecs.toBinary(avroRecord.getSchema).apply(avroRecord)

    r
  }

//schema comes from a rest call to the schema repository
new ProducerRecord[String, Array[Byte]](topic, myStringKeyGoesHere, serializeAsByteArray(jsonStringToAvro(jsonObjectAsStringGoesHere, schema)))


        producer.send(producerRecord, new Callback {...})

Solution

  • If you look at the Confluent source code , you'll see that order of operations for interacting with a schema repository are

    1. Take the schema from the Avro record, and compute its ID. Ideally POST-ing the Schema to the repository, or otherwise hashing it should give you an ID.
    2. Allocate a ByteBuffer
    3. Write the returned ID to the buffer
    4. Write the Avro object value (excluding the schema) as bytes into the buffer
    5. Send that byte buffer to Kafka

    Presently, your Bijection usage will include the schema in the bytes, not replace it with an ID