Search code examples
apache-kafkadeserializationapache-kafka-connectavroconfluent-schema-registry

Failed to deserialize data for topic "testtopic" to Avro for sink connector


Here is the sink connector properties

 "name": "sink-testtopic-crdb",
 "config": {
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:postgresql://roach-single:26257/sampledb?useSSL=false&reWriteBatchedInserts=true",
"topics": "testtopic",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
"key.converter.schemas.enable": false,
"value.converter.schemas.enable": true,
"config.action.reload": "restart",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"print.key": "true",
"errors.tolerance": "all",
"connection.user": "root",
"connection.password": "",
"table.name.format": "schema_sample.sample_test",
"auto.create": true,
"auto.evolve": true,
"insert.mode": "insert",
"pk.mode": "none",
"pk.fields": "none",
"batch.size": 1,
"consumer.override.max.poll.records": 1}

Here is the schema registered in schema registry for "testtopic" by posting it to the url "http://localhost:8081/subjects/testtopic-value/versions"

{
    "schema": "{\"type\":\"record\",\"name\":\"sampleTest\",\"fields\":[{\"name\":\"testid\",\"type\":\"string\"},{\"name\":\"testname\",\"type\":\"string\"}]}", 
    "compatibility": "BACKWARD"}

Here is the message I'm inserting to Kafka Topic

{"testid": "226", "testname": "testSample21"}

Still messages are not getting ingested to ksql db and in the kafka connect logs, I could see the below error

''' Executing stage 'VALUE_CONVERTER' with class 'io.confluent.connect.avro.AvroConverter', where consumed record is {topic='testtopic', partition=0, offset=16, timestamp=1691668835004, timestampType=CreateTime}. (org.apache.kafka.connect.runtime.errors.LogReporter:66) org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic testtopic to Avro: at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:148)


Solution

  • To summarize the comments...

    Use kafka-avro-console-producer or similar tooling to actually produce Avro. As written in the question, you've only produced JSON strings. This cannot work with AvroConverter. You could use JSONConverter instead, but then you need to modify the data you've produced to include a schema as part of the record.