Search code examples
avrospring-cloud-streamconfluent-schema-registry

Unrecognised header byte error when try to decode an Avro message in Spring Cloud Stream


I am trying to write a test case for my Spring Cloud Stream application. I am using Confluent Schema Registry with Avro, so I need to decode the message after polling from the channel. Here is my code:

    processor.input()
        .send(MessageBuilder.withPayload(InputData).build());

    Message<?> message = messageCollector.forChannel(processor.output()).poll();

    BinaryMessageDecoder<OutputData> decoder = OutputData.getDecoder();
    OutputData outputObject = decoder.decode((byte[]) message.getPayload());

For some reason this code throws

org.apache.avro.message.BadHeaderException: Unrecognized header bytes: 0x00 0x08

I am not sure if this is some sort of bug I am facing or I am not following a proper way to decode the received avro message. I suspect I need to set header with something, but I am not quite sure how and with what exactly. I would appreciate it if someone could help me with this matter.

P.S: I am using spring-cloud-stream-test-support for the purpose of this test.


Solution

  • It turns out that the issue was related to how I was trying to decode the Avro message. By using the official Avro libraries, the following code worked for me:

    Decoder decoder = DecoderFactory.get().binaryDecoder((byte[]) message.getPayload(), null);
    DatumReader<OutputData> reader = new SpecificDatumReader<>(OutputData.getClassSchema());
    
    RawDataCapsule rawDataCapsule = reader.read(null , decoder);