Search code examples
goapache-kafkaavrokafka-consumer-api

Consume Kafka Avro messages in go


I'm trying to consume Kafka messages in avro format but I'm not able to decode the messages from avro to json in Go.

I'm using the Confluent platform (3.0.1). For example I produce avro messages like:

kafka-avro-console-producer --broker-list localhost:9092 --topic test --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
{"f1":"message1"}
{"f1":"message2"}

Now I consume messages with the go Kafka libary: sarama. Plain text message are working fine. Avro message have to be decoded. I found different libs: github.com/linkedin/goavro, github.com/elodina/go-avro

But after decoding I get a json without values (both libs):

{"f1":""}

goavro:

avroSchema := `
{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}
`
codec, err := goavro.NewCodec(avroSchema)
if err != nil {
    log.Fatal(err)
}
bb := bytes.NewBuffer(msg.Value)
decoded, err := codec.Decode(bb)
log.Println(fmt.Sprintf("%s", decoded))

go-avro:

schema := avro.MustParseSchema(avroSchema)
reader := avro.NewGenericDatumReader()
reader.SetSchema(schema)
decoder := avro.NewBinaryDecoder(msg.Value)
decodedRecord := avro.NewGenericRecord(schema)
log.Println(decodedRecord.String())

msg = sarama.ConsumerMessage


Solution

  • Just found out (by comparing binary avro messages) that I had to remove the first 5 elements of the message byte array - now everything works :)

    message = msg.Value[5:]
    

    Maybe someone can explain why