Search code examples
javaapache-kafkaavroconfluent-schema-registry

Deserializing a byte[] array from a Kafka ByteArrayDeserializer into a GenericRecord or a SpecificRecord with schema


I currently have an event stream with a Kafka topic that sends a schema-registry manager event through Java Spring Kafka. On the producer side, I'm sending with no problems:

props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);

I can receive these messages without any problem by specifying the Kafka receiver as follows:

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);

I already have access to the generated class Java class that is pulled from the schema registry and using this config I can deserialize no problem.

Where I am facing an issue is that I have a consumer that has to receive the message using:

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);

No matter what I try implementation wise, everything I face throws the following exceptions depending on implementation: org.apache.avro.InvalidAvroMagicException: Not an Avro data file. org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -55

This is the only prior I can find. I have tried deserialization with and without the schema and the same errors occur. I do have access to the schema at deserialization time, the same generated Java used to send the message on the producer side.

Deserialize avro to generic record without schema

I have attached two example impls to the bottom of this post to show what has not worked:

The following has been tried with and without the specific schema, as well as deserializing to a single object vs an array of objects or to a GenericRecord:

    //event is of type byte[]
LOG.info("Recieved Event: {}", event);
    LOG.debug("data='{}'", DatatypeConverter.printHexBinary(event));
    ByteArrayInputStream in = new ByteArrayInputStream(event);
    DatumReader<SpecificSchemaObject> userDatumReader = new SpecificDatumReader<>(SpecificSchemaObject.getClassSchema());
    BinaryDecoder decoder = DecoderFactory.get().directBinaryDecoder(in, null);
    List<SpecificSchemaObject> records = new ArrayList<SpecificSchemaObject>();
    SpecificSchemaObject result = null;
    try {
      while (true) {
        try {
          SpecificSchemaObject record = userDatumReader.read(null, decoder);
          records.add(record);
        } catch (EOFException eof) {
          break;
        }
      }
      result = (SpecificSchemaObject) userDatumReader.read(null, decoder);
      LOG.info("deserialized data='{}'", records);
      result = (SpecificSchemaObject) records;
    } catch (IOException ioe) {

    }
Schema schema = WebsiteAdminEvent.getClassSchema();
    DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(schema);
    Decoder decoder = DecoderFactory.get().directBinaryDecoder(new ByteArrayInputStream(event), null);
    GenericRecord gr = null;
    try {
      gr = datumReader.read(null, decoder);
    } catch (IOException e) {
      throw new RuntimeException(e);
    }
    LOG.info("got record: ", gr);
    //event is of type byte[]
    List<WebsiteAdminEvent> listOfRecords = new ArrayList<>();
    DatumReader<WebsiteAdminEvent> reader = new GenericDatumReader<>();
    DataFileReader<WebsiteAdminEvent> fileReader =
        null;
    try {
      fileReader = new DataFileReader<>(new SeekableByteArrayInput(event), reader);

      WebsiteAdminEvent record = null;

      while (fileReader.hasNext()) {
        listOfRecords.add(fileReader.next(record));
      }
    } catch (IOException e) {
      throw new RuntimeException(e);
    }

Solution

  • If the producer used the Schema Registry, then the encoded Avro bytes are not "valid" Avro; they start with 5 bytes that are not part of the Avro spec. The bytes also aren't Avro file container format, therefore DataFileReader shouldn't be used.

    Ultimately, there is no reason to use ByteArrayDeserializer, but even if you did, that's not stopping you from manually constructing new KafkaAvroDeserializer() and calling its deserialize function. That class is open source if you want to see how it really works.