Search code examples
oraclejdbcapache-kafkaapache-kafka-connectksqldb

Kafka connect - JDBC sink connectivity with KSQL


I am having an issue with JDBC sink connector to consume topic created by KSQL.

below options I have tried to make it work:

  1. with key and without key
  2. with schema registry and with schema manually created
  3. with AVRO and with JSON
  4. two types of Errors I am facing,

  1. with scenario 3 error looks like below
[2023-02-07 07:20:27,821] ERROR WorkerSinkTask{id=oracle-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:223)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:149)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:513)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:493)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:332)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error of topic MY_EMPLOYEE:
        at io.confluent.connect.json.JsonSchemaConverter.toConnectData(JsonSchemaConverter.java:119)
        at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:88)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$3(WorkerSinkTask.java:513)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:173)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:207)
        ... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing JSON message for id -1
        at io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserialize(AbstractKafkaJsonSchemaDeserializer.java:180)
        at io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaJsonSchemaDeserializer.java:235)
        at io.confluent.connect.json.JsonSchemaConverter$Deserializer.deserialize(JsonSchemaConverter.java:165)
        at io.confluent.connect.json.JsonSchemaConverter.toConnectData(JsonSchemaConverter.java:108)
        ... 17 more
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
        at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.getByteBuffer(AbstractKafkaSchemaSerDe.java:244)
        at io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserialize(AbstractKafkaJsonSchemaDeserializer.java:115)
        ... 20 more
[2023-02-07 07:20:27,822] INFO Stopping task (io.confluent.connect.jdbc.sink.JdbcSinkTask)
  1. with scenario 1 & 2 error says
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:223)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:149)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:516)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:493)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:332)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
        at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:328)
        at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:88)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$4(WorkerSinkTask.java:516)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:173)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:207)
        ... 13 more

I followed few articles and videos similar to my issue but none of them worked reference articles:

  1. https://forum.confluent.io/t/ksqldb-and-the-kafka-connect-jdbc-sink/187
  2. https://github.com/confluentinc/ksql/issues/3487

My configuration and topic given below, which I am trying to sink in Oracle database:

  1. scenario without key:
{
“name”: “destination-connector-simple”,
“config”: {
“connector.class”: “io.confluent.connect.jdbc.JdbcSinkConnector”,
“topics”: “MY_STREAM1”,
“tasks.max”: “1”,
“connection.url”: “jdbc:oracle:thin:@oracle21:1521/orclpdb1”,
“connection.user”: “c__sinkuser”,
“connection.password”: “sinkpw”,
“table.name.format”: “kafka_customers”,
“auto.create”: “true”,
“key.ignore”:“true”,
“pk.mode”: “none”,
“value.converter.schemas.enable”: “false”,
“key.converter.schemas.enable”: “false”,
“key.converter”: “org.apache.kafka.connect.storage.StringConverter”
}
}
  1. scenario with key:
{
“name”: “oracle-sink”,
“config”: {
“connector.class”: “io.confluent.connect.jdbc.JdbcSinkConnector”,
“tasks.max”: “1”,
“topics”: “MY_EMPLOYEE”,
“table.name.format”: “kafka_customers”,
“connection.url”: “jdbc:oracle:thin:@oracle21:1521/orclpdb1”,
“connection.user”: “c__sinkuser”,
“connection.password”: “sinkpw”,
“auto.create”:true,
“auto.evolve”:true,
“pk.fields”: “ID”,
“insert.mode”:“upsert”,
“delete.enabled”:true,
“delete.retention.ms”:100,
“pk.mode”: “record_key”,
“key.converter”: “io.confluent.connect.json.JsonSchemaConverter”,
“key.converter.schema.registry.url”: “h t tp :confused: / schema-registry :8081”,
“value.converter”: “io.confluent.connect.json.JsonSchemaConverter”,
“value.converter.schema.registry.url”: “htt p : / /schema-registry :8081”
}
}

topic to be consumed in sink (Any one of them would be okay):

  1. without key
print ‘MY_STREAM1’ from beginning;
Key format: ¯_(ツ)_/¯ - no data processed
Value format: JSON or KAFKA_STRING
rowtime: 2023/02/05 18:16:16.553 Z, key: , value: {“L_EID”:“101”,“NAME”:“Dhruv”,“LNAME”:“S”,“L_ADD_ID”:“201”}, partition: 0
rowtime: 2023/02/05 18:16:16.554 Z, key: , value: {“L_EID”:“102”,“NAME”:“Dhruv1”,“LNAME”:“S1”,“L_ADD_ID”:“202”}, partition: 0
  1. topic with key:
ksql> print ‘MY_EMPLOYEE’ from beginning;
Key format: JSON or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 2023/02/05 18:16:16.553 Z, key: 101, value: {“EID”:“101”,“NAME”:“Dhruv”,“LNAME”:“S”,“ADD_ID”:“201”}, partition: 0
rowtime: 2023/02/05 18:16:16.554 Z, key: 102, value: {“EID”:“102”,“NAME”:“Dhruv1”,“LNAME”:“S1”,“ADD_ID”:“202”}, partition: 0
  1. Topic with schema (manually created)
ksql> print ‘E_SCHEMA’ from beginning;
Key format: ¯_(ツ)_/¯ - no data processed
Value format: JSON or KAFKA_STRING
rowtime: 2023/02/06 20:01:25.824 Z, key: , value: {“SCHEMA”:{“TYPE”:“struct”,“FIELDS”:[{“TYPE”:“int32”,“OPTIONAL”:false,“FIELD”:“L_EID”},{“TYPE”:“int32”,“OPTIONAL”:false,“FIELD”:“NAME”},{“TYPE”:“int32”,“OPTIONAL”:false,“FIELD”:“LAME”},{“TYPE”:“int32”,“OPTIONAL”:false,“FIELD”:“L_ADD_ID”}],“OPTIONAL”:false,“NAME”:“”},“PAYLOAD”:{“L_EID”:“201”,“NAME”:“Vishuddha”,“LNAME”:“Sh”,“L_ADD_ID”:“401”}}, partition: 0
  1. Topic with Avro:
ksql> print ‘MY_STREAM_AVRO’ from beginning;
Key format: ¯_(ツ)_/¯ - no data processed
Value format: AVRO or KAFKA_STRING
rowtime: 2023/02/05 18:16:16.553 Z, key: , value: {“L_EID”: “101”, “NAME”: “Dhruv”, “LNAME”: “S”, “L_ADD_ID”: “201”}, partition: 0
rowtime: 2023/02/05 18:16:16.554 Z, key: , value: {“L_EID”: “102”, “NAME”: “Dhruv1”, “LNAME”: “S1”, “L_ADD_ID”: “202”}, partition: 0
rowtime: 2023/02/05 18:16:16.553 Z, key: , value: {“L_EID”: “101”, “NAME”: “Dhruv”, “LNAME”: “S”, “L_ADD_ID”: “201”}, partition: 0
rowtime: 2023/02/05 18:16:16.554 Z, key: , value: {“L_EID”: “102”, “NAME”: “Dhruv1”, “LNAME”: “S1”, “L_ADD_ID”: “202”}, partition: 0

could you please help me complete my POC in time.


Solution

  • With

    Value format: JSON or KAFKA_STRING
    

    Then you need

    “value.converter.schemas.enable”: “true”,
    

    And, as the error says, your JSON needs schema and payload fields but not uppercased. Refer - https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained/


    In order to use JsonSchemaConverter, you would need to produce the data using the Schema Registry and the JSON Schema Serializer... Refer ksql docs on VALUE_FORMAT = JSON_SR

    https://docs.ksqldb.io/en/latest/reference/serialization/#json

    Similar for Avro, but with AvroConverter and VALUE_FORMAT = AVRO.

    You can ignore the keys, but the same concept applies. Ideally, you just have plain String/Integer keys (no schemas), so only set key.converter to use the respective class for those.


    In any case, JDBC sink will not accept plain JSON. You need a schema.