Search code examples
deserializationavroapache-kafka-streamsnode-kafka

Nodejs avro serialization without schema registry followed by deserialization in Kafka Streams


I would like to ask for some guidance with the following problem. I'm trying to learn how to perform serialization of Avro data using nodejs without schema registry, publish it to a Kafka cluster and then retrieve it in Kafka Streams (Java).

On the javascript side, I tried using kafka-node along with avsc for the serialization. In Kafka Streams I decided to implement a custom Serde since, as far as I understand, the Avro Serdes provided by the Streams API are designed to fetch the schemas directly from the Schema Registry.

Here's a javascript code snippet for a simple producer:

const avro = require('avsc');

const messageKey = "1";

const schemaType = avro.Type.forSchema({
    type: "record",
    name: "product",
    fields: [
        {
        name: "id",
        type: "int"
        },
        {
        name: "name",
        type: "string"
        },
        {
        name: "price",
        type: "double"
        },
        {
        name: "stock",
        type: "int"
        }
    ]
});

const messageValueBuffer = schemaType.toBuffer({id, name, stock, price});
const payload = [{topic: 'product', key: messageKey, messages: messageValueBuffer, partition: 0}];
producer.send(payload, sendCallback);

And here's how I would currently try to implement the deserializer:

public Product deserialize(String topic, byte[] data) {
    SeekableByteArrayInput inputstream =  new SeekableByteArrayInput(data);

    DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(schema);

    DataFileReader<GenericRecord> dataFileReader;

    Product product = null;
    try {
        dataFileReader = new DataFileReader<GenericRecord>(inputstream, datumReader);
        GenericRecord record = new GenericData.Record(schema);

        while(dataFileReader.hasNext()) {
            dataFileReader.next();
            product = genericRecordToObject(record, new Product());
        }

    } catch (IOException e) {
        e.printStackTrace();

    }   
    return product;
}

When the streams application attempts to deserialize the data, however, I am faced with the following error, specifically at the line of code where the DataFileReader gets instantiated:

org.apache.avro.InvalidAvroMagicException: Not an Avro data file.
    at org.apache.avro.file.DataFileStream.initialize(DataFileStream.java:111)
    at org.apache.avro.file.DataFileReader.<init>(DataFileReader.java:106)
    at org.apache.avro.file.DataFileReader.<init>(DataFileReader.java:98)
    at myapps.ProductAvroSerde$ProductDeserializer.deserialize(ProductAvroSerde.java:138)
    at myapps.ProductAvroSerde$ProductDeserializer.deserialize(ProductAvroSerde.java:128)
    at myapps.ProductAvroSerde$ProductDeserializer.deserialize(ProductAvroSerde.java:1)
    at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:60)
    at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
    at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:168)
    at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:109)
    at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:156)
    at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:808)
    at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:925)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:763)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)

I'm at a loss in how to proceed. Any advice would be appreciated.


Solution

  • Maybe I'm wrong but i think you shouldn't use DataFileReader, only the DatumReader.

    I did something similar in kafka (not Kafka Streams), maybe can give you some ideas:

    The full example (very simple) is here: https://github.com/anigmo97/KafkaRecipes/blob/master/java/consumers/StringKeyAvroValueConsumers/StandardAvro/StandardAvroConsumer.java

    As you can see, i didn't create a Serializer, i deserialize the value and get a Generic Record.

    public static void main(String[] args) {
            final KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(getConsumerProperties());
            consumer.subscribe(Collections.singleton(TOPIC));
            ConsumerRecords<String, byte[]> consumerRecords;
            String valueSchemaString = "{\"type\": \"record\",\"namespace\": \"example.avro\",\"name\": \"test_record\","
                    + "\"fields\":[" + "{\"name\": \"id\",\"type\": \"int\"},"
                    + "{\"name\": \"date\",\"type\": [\"int\", \"null\"]}," + "{\"name\": \"info\",\"type\": \"string\"}"
                    + "]}}";
            Schema avroValueSchema = new Schema.Parser().parse(valueSchemaString);
            SpecificDatumReader<GenericRecord> datumReader = new SpecificDatumReader<>(avroValueSchema);
            try {
                while (true) {
                    consumerRecords = consumer.poll(1000);
    
                    consumerRecords.forEach(record -> {
                        ByteArrayInputStream inputStream = new ByteArrayInputStream(record.value());
                        BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(inputStream, null);
                        GenericRecord deserializedValue = null;
                        try {
                            deserializedValue = datumReader.read(null, binaryDecoder);
                        } catch (IOException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                        System.out.printf("Consumer Record:(%s, %s)\n", record.key(), deserializedValue);
                    });
    
                    consumer.commitAsync();
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                consumer.close();
                System.out.println("DONE");
            }
    
        }
    

    I hope it helps.