I am trying to write a test case for my Spring Cloud Stream application. I am using Confluent Schema Registry with Avro, so I need to decode the message after polling from the channel. Here is my code:
processor.input()
.send(MessageBuilder.withPayload(InputData).build());
Message<?> message = messageCollector.forChannel(processor.output()).poll();
BinaryMessageDecoder<OutputData> decoder = OutputData.getDecoder();
OutputData outputObject = decoder.decode((byte[]) message.getPayload());
For some reason this code throws
org.apache.avro.message.BadHeaderException: Unrecognized header bytes: 0x00 0x08
I am not sure if this is some sort of bug I am facing or I am not following a proper way to decode the received avro message. I suspect I need to set header with something, but I am not quite sure how and with what exactly. I would appreciate it if someone could help me with this matter.
P.S: I am using spring-cloud-stream-test-support
for the purpose of this test.
It turns out that the issue was related to how I was trying to decode the Avro message. By using the official Avro libraries, the following code worked for me:
Decoder decoder = DecoderFactory.get().binaryDecoder((byte[]) message.getPayload(), null);
DatumReader<OutputData> reader = new SpecificDatumReader<>(OutputData.getClassSchema());
RawDataCapsule rawDataCapsule = reader.read(null , decoder);