Search code examples
apache-kafkaapache-kafka-connectconfluent-platform

ElasticsearchSinkConnector Failed to deserialize data to Avro


I created the simplest kafka sink connector config and I'm using confluent 4.1.0:

{
  "connector.class": 
  "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
  "type.name": "test-type",
  "tasks.max": "1",
  "topics": "dialogs",
  "name": "elasticsearch-sink",
  "key.ignore": "true",
  "connection.url": "http://localhost:9200",
  "schema.ignore": "true"
}

and in the topic I save the messages in JSON

{ "topics": "resd"}

But in the result I get an error:

Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1 Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!


Solution

  • That error happens because it's trying to read non Confluent Schema Registry encoded Avro messages.

    If the topic data is Avro, it needs to use the Schema Registry.

    Otherwise, if topic data is JSON, then you've started the connect cluster with AvroConverter on your keys or values in the property file, where you need to use the JsonConverter instead