I have a kafka stream with avro message using the confluent.io kafka package. This is working fine for the java applications. But I am now trying to read in these messages in javascript.
I've been attempting to use the kafka-node + avsc packages to decode the messages from a buffer array to string, using the schema. I know that confluent puts the first 5 bytes as a magic byte (0) + 4 bytes for the schema Id.
So I slice the Buffer to remove those bytes and attempt to send this to avsc to decode. But I get an error
return this.buf.utf8Slice(pos, pos + len);
RangeError: out of range index at RangeError (native) at Tap.readString (C:\git\workflowapps\workItemsApp\node_modules\avsc\lib\utils.js:452:19) at StringType._read (C:\git\workflowapps\workItemsApp\node_modules\avsc\lib\types.js:612:58)
Also attempting to manually decode this leaves lots of non-utf8 characters and I am losing data that way.
Sample Code:
consumer.on('message', function(message) {
var val = message.value.slice(4);
sails.log.info('val buffer', val, val.length);
sails.log.info('hex',val.toString('hex'));
var type = avro.parse({"type":"record",
"name":"StatusEvent",
"fields":[{"name":"ApplicationUUID","type":"string"},
{"name":"StatusUUID","type":"string"},
{"name":"Name","type":"string"},
{"name":"ChangedBy","type":"string"},
{"name":"ChangedByUUID","type":"string"},
{"name":"ChangedAt","type":"long"}]
});
var decodedValue = type.fromBuffer(val);
sails.log.info('Decoded', decodedValue);
});
Your slice(4)
should be slice(5)
(otherwise you're only skipping 4 out of the 5 header bytes). You might also be able to find helpful information here.