Nodejs avro serialization without schema registry followed by deserialization in Kafka Streams

I would like to ask for some guidance with the following problem. I'm trying to learn how to perform serialization of Avro data using nodejs without schema registry, publish it to a Kafka cluster and then retrieve it in Kafka Streams (Java).

On the javascript side, I tried using kafka-node along with avsc for the serialization. In Kafka Streams I decided to implement a custom Serde since, as far as I understand, the Avro Serdes provided by the Streams API are designed to fetch the schemas directly from the Schema Registry.

Here's a javascript code snippet for a simple producer:

const avro = require('avsc');

const messageKey = "1";

const schemaType = avro.Type.forSchema({
    type: "record",
    name: "product",
    fields: [
        name: "id",
        type: "int"
        name: "name",
        type: "string"
        name: "price",
        type: "double"
        name: "stock",
        type: "int"

const messageValueBuffer = schemaType.toBuffer({id, name, stock, price});
const payload = [{topic: 'product', key: messageKey, messages: messageValueBuffer, partition: 0}];
producer.send(payload, sendCallback);

And here's how I would currently try to implement the deserializer:

public Product deserialize(String topic, byte[] data) {
    SeekableByteArrayInput inputstream =  new SeekableByteArrayInput(data);

    DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(schema);

    DataFileReader<GenericRecord> dataFileReader;

    Product product = null;
    try {
        dataFileReader = new DataFileReader<GenericRecord>(inputstream, datumReader);
        GenericRecord record = new GenericData.Record(schema);

        while(dataFileReader.hasNext()) {
            product = genericRecordToObject(record, new Product());

    } catch (IOException e) {

    return product;

When the streams application attempts to deserialize the data, however, I am faced with the following error, specifically at the line of code where the DataFileReader gets instantiated:

org.apache.avro.InvalidAvroMagicException: Not an Avro data file.
    at org.apache.avro.file.DataFileStream.initialize(
    at org.apache.avro.file.DataFileReader.<init>(
    at org.apache.avro.file.DataFileReader.<init>(
    at myapps.ProductAvroSerde$ProductDeserializer.deserialize(
    at myapps.ProductAvroSerde$ProductDeserializer.deserialize(
    at myapps.ProductAvroSerde$ProductDeserializer.deserialize(
    at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(
    at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(
    at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(
    at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(
    at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(
    at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(
    at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(

I'm at a loss in how to proceed. Any advice would be appreciated.


  • Maybe I'm wrong but i think you shouldn't use DataFileReader, only the DatumReader.

    I did something similar in kafka (not Kafka Streams), maybe can give you some ideas:

    The full example (very simple) is here:

    As you can see, i didn't create a Serializer, i deserialize the value and get a Generic Record.

    public static void main(String[] args) {
            final KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(getConsumerProperties());
            ConsumerRecords<String, byte[]> consumerRecords;
            String valueSchemaString = "{\"type\": \"record\",\"namespace\": \"example.avro\",\"name\": \"test_record\","
                    + "\"fields\":[" + "{\"name\": \"id\",\"type\": \"int\"},"
                    + "{\"name\": \"date\",\"type\": [\"int\", \"null\"]}," + "{\"name\": \"info\",\"type\": \"string\"}"
                    + "]}}";
            Schema avroValueSchema = new Schema.Parser().parse(valueSchemaString);
            SpecificDatumReader<GenericRecord> datumReader = new SpecificDatumReader<>(avroValueSchema);
            try {
                while (true) {
                    consumerRecords = consumer.poll(1000);
                    consumerRecords.forEach(record -> {
                        ByteArrayInputStream inputStream = new ByteArrayInputStream(record.value());
                        BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(inputStream, null);
                        GenericRecord deserializedValue = null;
                        try {
                            deserializedValue =, binaryDecoder);
                        } catch (IOException e) {
                            // TODO Auto-generated catch block
                        System.out.printf("Consumer Record:(%s, %s)\n", record.key(), deserializedValue);
            } catch (Exception e) {
            } finally {

    I hope it helps.