I would like to ask for some guidance with the following problem. I'm trying to learn how to perform serialization of Avro data using nodejs without schema registry, publish it to a Kafka cluster and then retrieve it in Kafka Streams (Java).
On the javascript side, I tried using kafka-node along with avsc for the serialization. In Kafka Streams I decided to implement a custom Serde since, as far as I understand, the Avro Serdes provided by the Streams API are designed to fetch the schemas directly from the Schema Registry.
Here's a javascript code snippet for a simple producer:
const avro = require('avsc');
const messageKey = "1";
const schemaType = avro.Type.forSchema({
type: "record",
name: "product",
fields: [
{
name: "id",
type: "int"
},
{
name: "name",
type: "string"
},
{
name: "price",
type: "double"
},
{
name: "stock",
type: "int"
}
]
});
const messageValueBuffer = schemaType.toBuffer({id, name, stock, price});
const payload = [{topic: 'product', key: messageKey, messages: messageValueBuffer, partition: 0}];
producer.send(payload, sendCallback);
And here's how I would currently try to implement the deserializer:
public Product deserialize(String topic, byte[] data) {
SeekableByteArrayInput inputstream = new SeekableByteArrayInput(data);
DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(schema);
DataFileReader<GenericRecord> dataFileReader;
Product product = null;
try {
dataFileReader = new DataFileReader<GenericRecord>(inputstream, datumReader);
GenericRecord record = new GenericData.Record(schema);
while(dataFileReader.hasNext()) {
dataFileReader.next();
product = genericRecordToObject(record, new Product());
}
} catch (IOException e) {
e.printStackTrace();
}
return product;
}
When the streams application attempts to deserialize the data, however, I am faced with the following error, specifically at the line of code where the DataFileReader gets instantiated:
org.apache.avro.InvalidAvroMagicException: Not an Avro data file.
at org.apache.avro.file.DataFileStream.initialize(DataFileStream.java:111)
at org.apache.avro.file.DataFileReader.<init>(DataFileReader.java:106)
at org.apache.avro.file.DataFileReader.<init>(DataFileReader.java:98)
at myapps.ProductAvroSerde$ProductDeserializer.deserialize(ProductAvroSerde.java:138)
at myapps.ProductAvroSerde$ProductDeserializer.deserialize(ProductAvroSerde.java:128)
at myapps.ProductAvroSerde$ProductDeserializer.deserialize(ProductAvroSerde.java:1)
at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:60)
at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:168)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:109)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:156)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:808)
at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:925)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:763)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
I'm at a loss in how to proceed. Any advice would be appreciated.
Maybe I'm wrong but i think you shouldn't use DataFileReader, only the DatumReader.
I did something similar in kafka (not Kafka Streams), maybe can give you some ideas:
The full example (very simple) is here: https://github.com/anigmo97/KafkaRecipes/blob/master/java/consumers/StringKeyAvroValueConsumers/StandardAvro/StandardAvroConsumer.java
As you can see, i didn't create a Serializer, i deserialize the value and get a Generic Record.
public static void main(String[] args) {
final KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(getConsumerProperties());
consumer.subscribe(Collections.singleton(TOPIC));
ConsumerRecords<String, byte[]> consumerRecords;
String valueSchemaString = "{\"type\": \"record\",\"namespace\": \"example.avro\",\"name\": \"test_record\","
+ "\"fields\":[" + "{\"name\": \"id\",\"type\": \"int\"},"
+ "{\"name\": \"date\",\"type\": [\"int\", \"null\"]}," + "{\"name\": \"info\",\"type\": \"string\"}"
+ "]}}";
Schema avroValueSchema = new Schema.Parser().parse(valueSchemaString);
SpecificDatumReader<GenericRecord> datumReader = new SpecificDatumReader<>(avroValueSchema);
try {
while (true) {
consumerRecords = consumer.poll(1000);
consumerRecords.forEach(record -> {
ByteArrayInputStream inputStream = new ByteArrayInputStream(record.value());
BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(inputStream, null);
GenericRecord deserializedValue = null;
try {
deserializedValue = datumReader.read(null, binaryDecoder);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.printf("Consumer Record:(%s, %s)\n", record.key(), deserializedValue);
});
consumer.commitAsync();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close();
System.out.println("DONE");
}
}
I hope it helps.