Search code examples
apache-kafkalogstashavroapache-kafka-connectconfluent-platform

Sending from Logastash to Kafka in with Avro


I am trying to send data from logstash into kafka using an avro schema.

My logstash output looks like:

kafka{
  codec => avro {
    schema_uri => "/tmp/avro/hadoop.avsc"
  }
  topic_id => "hadoop_log_processed"
}

My schema file looks like:

{"type": "record",
 "name": "hadoop_schema",
 "fields": [
     {"name": "loglevel", "type": "string"},
     {"name": "error_msg",  "type": "string"},
     {"name": "syslog", "type": ["string", "null"]},
     {"name": "javaclass", "type": ["string", "null"]}
 ]
}

Output of kafka-console-consumer:

CElORk+gAURvd24gdG8gdGhlIGxhc3QgbWVyZ2UtcGCzcywgd2l0aCA3IHNlZ21lbnRzIGxlZnQgb2YgdG90YWwgc256ZTogMjI4NDI0NDM5IGJ5dGVzAAxbbWFpbl0APm9yZy5hcGFjaGUuaGFkb29wLm1hcHJlZC5NZXJnZXI=
CElORk9kVGFzayAnYXR0ZW1wdF8xNDQ1JDg3NDkxNDQ1XzAwMDFfbV8wMDAwMDRfMCcgZG9uZS4ADFttYWluXQA6t3JnLmFwYWNoZS5oYWRvb6AubWFwcmVkLlRhc2s=
CElORk9kVGFzayAnYXR0ZW1wdF8xNDQ1JDg3NDkxNDQ1XzAwMDFfbV8wMDAwMDRfMCcgZG9uZS4ADFttYWluXQA6t3JnLmFwYWNoZS5oYWRvb6AubWFwcmVkLlRhc2s=
CElORk9OVGFza0hlYAJ0YmVhdEhhbmRsZXIgdGhyZWFkIGludGVycnVwdGVkAERbVGFza0hlYXJdYmVhdEhhbmRsZXIgUGluZ0NoZWNrZXJdAG5vcmcuYVBhY2hlLmhhZG9vcC5tYXByZWR1Y2UudjIuYXBwLlRhc2tIZWFydGJ3YXRIYW5kbGVy

I am getting also the following error in my connector:

org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:488)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:465)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic hadoop_log_processed to Avro: 
    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:110)
    at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:86)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$2(WorkerSinkTask.java:488)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
    ... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

I know that I encoding the data on the logstash site. Do I have to decode the messages during the input in kafka, or can I decode/deserialize the data in the connector config?

Is there a way to disable the encoding on the logstash site? I read about an base64_encoding options, but it seems it hasn't the option.


Solution

  • The problem you have here is that Logstash's Avro codec is not serialising the data into an Avro form that the Confluent Schema Registry Avro deserialiser expects.

    Whilst Logstash takes an avsc and encodes the data into a binary form based on that, the Confluent Schema Registry [de]serialiser instead stores & retrieves a schema directly from the registry (not avsc files).

    So when you get Failed to deserialize data … SerializationException: Unknown magic byte! this is the Avro deserialiser saying that it doesn't recognise the data as Avro that's been serialised using the Schema Registry serialiser.

    I had a quick Google and found this codec that looks like it supports the Schema Registry (and thus Kafka Connect, and any other consumer deserialising Avro data this way).

    Alternatively, write your data as JSON into Kafka and use the org.apache.kafka.connect.json.JsonConverter in Kafka Connect to read it from the topic.

    Ref: