Here is the sink connector properties
"name": "sink-testtopic-crdb",
"config": {
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:postgresql://roach-single:26257/sampledb?useSSL=false&reWriteBatchedInserts=true",
"topics": "testtopic",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
"key.converter.schemas.enable": false,
"value.converter.schemas.enable": true,
"config.action.reload": "restart",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"print.key": "true",
"errors.tolerance": "all",
"connection.user": "root",
"connection.password": "",
"table.name.format": "schema_sample.sample_test",
"auto.create": true,
"auto.evolve": true,
"insert.mode": "insert",
"pk.mode": "none",
"pk.fields": "none",
"batch.size": 1,
"consumer.override.max.poll.records": 1}
Here is the schema registered in schema registry for "testtopic" by posting it to the url "http://localhost:8081/subjects/testtopic-value/versions"
{
"schema": "{\"type\":\"record\",\"name\":\"sampleTest\",\"fields\":[{\"name\":\"testid\",\"type\":\"string\"},{\"name\":\"testname\",\"type\":\"string\"}]}",
"compatibility": "BACKWARD"}
Here is the message I'm inserting to Kafka Topic
{"testid": "226", "testname": "testSample21"}
Still messages are not getting ingested to ksql db and in the kafka connect logs, I could see the below error
''' Executing stage 'VALUE_CONVERTER' with class 'io.confluent.connect.avro.AvroConverter', where consumed record is {topic='testtopic', partition=0, offset=16, timestamp=1691668835004, timestampType=CreateTime}. (org.apache.kafka.connect.runtime.errors.LogReporter:66) org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic testtopic to Avro: at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:148)
To summarize the comments...
Use kafka-avro-console-producer
or similar tooling to actually produce Avro. As written in the question, you've only produced JSON strings. This cannot work with AvroConverter
. You could use JSONConverter
instead, but then you need to modify the data you've produced to include a schema
as part of the record.