Search code examples
apache-kafkaavro

Avro with Kafka - Deserializing with changing schema


Based on Avro schema I generated a class (Data) to work with the class appropriate to the schema After it I encode the data and send in to other application "A" using kafka

Data data; // <- The object was initialized before . Here it is only the declaration "for example"
EncoderFactory encoderFactory = EncoderFactory.get();
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        BinaryEncoder encoder = encoderFactory. directBinaryEncoder(out, null);                    
        DatumWriter<Tloog> writer;                  
        writer = new SpecificDatumWriter<Data>( Data.class);
        writer.write(data, encoder);
        byte[] avroByteMessage = out.toByteArray();

On the other side (in the application "A") I deserilize the the data by implementing Deserializer

class DataDeserializer implements Deserializer<Data> {
    private String encoding = "UTF8";

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        // nothing to do
    }

    @Override
    public Tloog deserialize(String topic, byte[] data) {
        try {
            if (data == null)
            {
                return null;
            }
            else
            {
                        DatumReader<Tloog> reader = new SpecificDatumReader<Data>( Data.class);
                        DecoderFactory decoderFactory = DecoderFactory.get();
                        BinaryDecoder decoder = decoderFactory.binaryDecoder( data, null);
                        Data decoded = reader.read(null, decoder);
                        return decoded;
            }
        } catch (Exception e) {
            throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + encoding);
        }
    }

The problem is that this approach requires the use of SpecificDatumReader, I.e.the Data class should be integrated with the application code...This could be problematic - schema could change and therefore Data class should be re-generated and integrated once more 2 questions:

  1. Should I use GenericDatumReader in the application? How to do that correctly. (I can save the schema simply in the application)
  2. Isthere a simple way to work with SpecificDatumReader if Data changes? How could it be integrated with out much trouble?

Thanks


Solution

  • I use GenericDatumReader -- well, actually I derive my reader class from it, but you get the point. To use it, I keep my schemas in a special Kafka topic -- Schema surprisingly enough. Consumers and producers both, on startup, read from this topic and configure their respective parsers.

    If you do it like this, you can even have your consumers and producers update their schemas on the fly, without having to restart them. This was a design goal for me -- I didn't want to have to restart my applications in order to add or change schemas. Which is why SpecificDatumReader doesn't work for me, and honestly why I use Avro in the first place instead of something like Thrift.

    Update

    The normal way to do Avro is to store the schema in the file with the records. I don't do it that way, primarily because I can't. I use Kafka, so I can't store the schemas directly with the data -- I have to store the schemas in a separate topic.

    The way I do it, first I load all of my schemas. You can read them from a text file; but like I said, I read them from a Kafka topic. After I read them from Kafka, I have an array like this:

    val schemaArray: Array[String] = Array(
      """{"name":"MyObj","type":"record","fields":[...]}""",
      """{"name":"MyOtherObj","type":"record","fields":[...]}"""
    )
    

    Apologize for the Scala BTW, but it's what I got.

    At any rate, then you need to create a parser, and foreach schema, parse it and create readers and writers, and save them off to Maps:

    val parser = new Schema.Parser()
    val schemas = Map(schemaArray.map{s => parser.parse(s)}.map(s => (s.getName, s)):_*)
    val readers = schemas.map(s => (s._1, new GenericDatumReader[GenericRecord](s._2)))
    val writers = schemas.map(s => (s._1, new GenericDatumWriter[GenericRecord](s._2)))
    var decoder: BinaryDecoder = null
    

    I do all of that before I parse an actual record -- that's just to configure the parser. Then, to decode an individual record I would do:

    val byteArray: Array[Byte] = ... // <-- Avro encoded record
    val schemaName: String = ... // <-- name of the Avro schema
    
    val reader = readers.get(schemaName).get
    
    decoder = DecoderFactory.get.binaryDecoder(byteArray, decoder)
    val record = reader.read(null, decoder)