I am having an issue with JDBC sink connector to consume topic created by KSQL.
below options I have tried to make it work:
[2023-02-07 07:20:27,821] ERROR WorkerSinkTask{id=oracle-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:223)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:149)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:513)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:493)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:332)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error of topic MY_EMPLOYEE:
at io.confluent.connect.json.JsonSchemaConverter.toConnectData(JsonSchemaConverter.java:119)
at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:88)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$3(WorkerSinkTask.java:513)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:173)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:207)
... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing JSON message for id -1
at io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserialize(AbstractKafkaJsonSchemaDeserializer.java:180)
at io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaJsonSchemaDeserializer.java:235)
at io.confluent.connect.json.JsonSchemaConverter$Deserializer.deserialize(JsonSchemaConverter.java:165)
at io.confluent.connect.json.JsonSchemaConverter.toConnectData(JsonSchemaConverter.java:108)
... 17 more
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.getByteBuffer(AbstractKafkaSchemaSerDe.java:244)
at io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserialize(AbstractKafkaJsonSchemaDeserializer.java:115)
... 20 more
[2023-02-07 07:20:27,822] INFO Stopping task (io.confluent.connect.jdbc.sink.JdbcSinkTask)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:223)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:149)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:516)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:493)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:332)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:328)
at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:88)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$4(WorkerSinkTask.java:516)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:173)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:207)
... 13 more
I followed few articles and videos similar to my issue but none of them worked reference articles:
My configuration and topic given below, which I am trying to sink in Oracle database:
{
“name”: “destination-connector-simple”,
“config”: {
“connector.class”: “io.confluent.connect.jdbc.JdbcSinkConnector”,
“topics”: “MY_STREAM1”,
“tasks.max”: “1”,
“connection.url”: “jdbc:oracle:thin:@oracle21:1521/orclpdb1”,
“connection.user”: “c__sinkuser”,
“connection.password”: “sinkpw”,
“table.name.format”: “kafka_customers”,
“auto.create”: “true”,
“key.ignore”:“true”,
“pk.mode”: “none”,
“value.converter.schemas.enable”: “false”,
“key.converter.schemas.enable”: “false”,
“key.converter”: “org.apache.kafka.connect.storage.StringConverter”
}
}
{
“name”: “oracle-sink”,
“config”: {
“connector.class”: “io.confluent.connect.jdbc.JdbcSinkConnector”,
“tasks.max”: “1”,
“topics”: “MY_EMPLOYEE”,
“table.name.format”: “kafka_customers”,
“connection.url”: “jdbc:oracle:thin:@oracle21:1521/orclpdb1”,
“connection.user”: “c__sinkuser”,
“connection.password”: “sinkpw”,
“auto.create”:true,
“auto.evolve”:true,
“pk.fields”: “ID”,
“insert.mode”:“upsert”,
“delete.enabled”:true,
“delete.retention.ms”:100,
“pk.mode”: “record_key”,
“key.converter”: “io.confluent.connect.json.JsonSchemaConverter”,
“key.converter.schema.registry.url”: “h t tp :confused: / schema-registry :8081”,
“value.converter”: “io.confluent.connect.json.JsonSchemaConverter”,
“value.converter.schema.registry.url”: “htt p : / /schema-registry :8081”
}
}
topic to be consumed in sink (Any one of them would be okay):
print ‘MY_STREAM1’ from beginning;
Key format: ¯_(ツ)_/¯ - no data processed
Value format: JSON or KAFKA_STRING
rowtime: 2023/02/05 18:16:16.553 Z, key: , value: {“L_EID”:“101”,“NAME”:“Dhruv”,“LNAME”:“S”,“L_ADD_ID”:“201”}, partition: 0
rowtime: 2023/02/05 18:16:16.554 Z, key: , value: {“L_EID”:“102”,“NAME”:“Dhruv1”,“LNAME”:“S1”,“L_ADD_ID”:“202”}, partition: 0
ksql> print ‘MY_EMPLOYEE’ from beginning;
Key format: JSON or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 2023/02/05 18:16:16.553 Z, key: 101, value: {“EID”:“101”,“NAME”:“Dhruv”,“LNAME”:“S”,“ADD_ID”:“201”}, partition: 0
rowtime: 2023/02/05 18:16:16.554 Z, key: 102, value: {“EID”:“102”,“NAME”:“Dhruv1”,“LNAME”:“S1”,“ADD_ID”:“202”}, partition: 0
ksql> print ‘E_SCHEMA’ from beginning;
Key format: ¯_(ツ)_/¯ - no data processed
Value format: JSON or KAFKA_STRING
rowtime: 2023/02/06 20:01:25.824 Z, key: , value: {“SCHEMA”:{“TYPE”:“struct”,“FIELDS”:[{“TYPE”:“int32”,“OPTIONAL”:false,“FIELD”:“L_EID”},{“TYPE”:“int32”,“OPTIONAL”:false,“FIELD”:“NAME”},{“TYPE”:“int32”,“OPTIONAL”:false,“FIELD”:“LAME”},{“TYPE”:“int32”,“OPTIONAL”:false,“FIELD”:“L_ADD_ID”}],“OPTIONAL”:false,“NAME”:“”},“PAYLOAD”:{“L_EID”:“201”,“NAME”:“Vishuddha”,“LNAME”:“Sh”,“L_ADD_ID”:“401”}}, partition: 0
ksql> print ‘MY_STREAM_AVRO’ from beginning;
Key format: ¯_(ツ)_/¯ - no data processed
Value format: AVRO or KAFKA_STRING
rowtime: 2023/02/05 18:16:16.553 Z, key: , value: {“L_EID”: “101”, “NAME”: “Dhruv”, “LNAME”: “S”, “L_ADD_ID”: “201”}, partition: 0
rowtime: 2023/02/05 18:16:16.554 Z, key: , value: {“L_EID”: “102”, “NAME”: “Dhruv1”, “LNAME”: “S1”, “L_ADD_ID”: “202”}, partition: 0
rowtime: 2023/02/05 18:16:16.553 Z, key: , value: {“L_EID”: “101”, “NAME”: “Dhruv”, “LNAME”: “S”, “L_ADD_ID”: “201”}, partition: 0
rowtime: 2023/02/05 18:16:16.554 Z, key: , value: {“L_EID”: “102”, “NAME”: “Dhruv1”, “LNAME”: “S1”, “L_ADD_ID”: “202”}, partition: 0
could you please help me complete my POC in time.
With
Value format: JSON or KAFKA_STRING
Then you need
“value.converter.schemas.enable”: “true”,
And, as the error says, your JSON needs schema and payload fields but not uppercased. Refer - https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained/
In order to use JsonSchemaConverter
, you would need to produce the data using the Schema Registry and the JSON Schema Serializer... Refer ksql docs on VALUE_FORMAT = JSON_SR
https://docs.ksqldb.io/en/latest/reference/serialization/#json
Similar for Avro, but with AvroConverter
and VALUE_FORMAT = AVRO
.
You can ignore the keys, but the same concept applies. Ideally, you just have plain String/Integer keys (no schemas), so only set key.converter
to use the respective class for those.
In any case, JDBC sink will not accept plain JSON. You need a schema.