Search code examples
apache-kafkaapache-kafka-connectksqldb

Dump table from Kafka into MariaDB with KSQL


I am doing aggregation with KSQL and need to persist the output table in MariaDB. I have already set up MariaDB and the JdbcSinkConnector. Unfortunately, the sink just won't work for me.

This is the table's structure in KSQL, which I would like to dump in MariaDB:

Field     | Type                           
--------------------------------------------
 a        | VARCHAR(STRING)  (primary key) 
 b        | VARCHAR(STRING)  (primary key) 
 c        | VARCHAR(STRING)  (primary key) 
 d        | INTEGER                        
--------------------------------------------

I am grouping by the columns a, b, c and do some aggregation, which is column d.

This is the connector:

create sink connector test with (
'tasks.max' = 1,
'key.converter.schema.registry.url' = 'http://schema-registry:8081', 
'value.converter.schema.registry.url' = 'http://schema-registry:8081',
'connector.class'          = 'io.confluent.connect.jdbc.JdbcSinkConnector',
'key.converter'='org.apache.kafka.connect.storage.StringConverter',
'value.converter'='org.apache.kafka.connect.storage.StringConverter',
'key.converter.schemas.enable' = 'false',
'value.converter.schemas.enable' = 'true',
'config.action.reload' = 'restart',
'errors.log.enable' = 'true',
'errors.log.include.messages' = 'true',
'print.key' = 'true',
'errors.tolerance' = 'all',
'topics'                   = 'my-topic',
'connection.url'           = 'jdbc:mysql://mariadb-docker-container:3306/ksql?autoReconnect=true&useSSL=false',
'connection.user'          = 'root', 
'connection.password'      = 'strongest-password-you-have-ever-seen',
'pk.fields' = 'a, b, c',
'pk.mode' = 'record_value',
'delete.enabled' = 'false');

Running this connector gives me the following error:

kafka-connect               | [2021-03-08 12:13:51,339] ERROR [TEST|task-0] WorkerSinkTask{id=TEST-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:187)
kafka-connect               | org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
kafka-connect               |   at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:591)
kafka-connect               |   at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:326)
kafka-connect               |   at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229)
kafka-connect               |   at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
kafka-connect               |   at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
kafka-connect               |   at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
kafka-connect               |   at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
kafka-connect               |   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
kafka-connect               |   at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
kafka-connect               |   at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
kafka-connect               |   at java.base/java.lang.Thread.run(Thread.java:834)
kafka-connect               | Caused by: org.apache.kafka.connect.errors.ConnectException: Sink connector 'TEST' is configured with 'delete.enabled=false' and 'pk.mode=record_value' and therefore requires records with a non-null Struct value and non-null Struct schema, but found record at (topic='my-topic',partition=1,offset=30,timestamp=1615205630513) with a String value and string value schema.
kafka-connect               |   at io.confluent.connect.jdbc.sink.RecordValidator.lambda$requiresValue$2(RecordValidator.java:86)
kafka-connect               |   at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:82)
kafka-connect               |   at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:73)
kafka-connect               |   at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:75)
kafka-connect               |   at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:563)
kafka-connect               |   ... 10 more
kafka-connect               | [2021-03-08 12:13:51,339] ERROR [TEST|task-0] WorkerSinkTask{id=TEST-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:188)
kafka-connect               | [2021-03-08 12:13:51,339] INFO [TEST|task-0] Stopping task (io.confluent.connect.jdbc.sink.JdbcSinkTask:119)
kafka-connect               | [2021-03-08 12:13:51,339] INFO [TEST|task-0] Closing connection #1 to MySql (io.confluent.connect.jdbc.util.CachedConnectionProvider:108)
kafka-connect               | [2021-03-08 12:13:51,340] INFO [TEST|task-0] [Consumer clientId=connector-consumer-TEST-0, groupId=connect-TEST] Revoke previously assigned partitions my-topic-1, my-topic-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:307)
kafka-connect               | [2021-03-08 12:13:51,340] INFO [TEST|task-0] [Consumer clientId=connector-consumer-TEST-0, groupId=connect-TEST] Member connector-consumer-TEST-0 sending LeaveGroup request to coordinator broker:29092 (id: 2147483646 rack: null) due to the consumer is being closed (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:1010)

I have also tried using AvroConverter os JsonConverter, but that didn't help, they just threw other errors (parsing errors), that didn't help much. I know it has something to do with the structure of the data, according to some research. But how am I supposed to provide a structure or convert the table into a "Struct", if the table results from grouping columns and an aggregation?

Any ideas? I am also considering simply not to use a connector, but to write a little program that pulls the tables / topics and writes them to MariaDB, in case the connector won't work.


Solution

  • If you're using the JDBC Sink you need to be using a serialisation format for your data that includes the schema, e.g. using Avro, Protobuf, or JSON Schema.

    In ksqlDB you can specify that when you create your object:

    CREATE TABLE MY_TABLE WITH (FORMAT='AVRO') AS 
      SELECT A,B,C,COUNT(*) AS D 
        FROM STREAM_FOO 
    GROUP BY A,B,C;
    

    Note that Avro keys support was added in ksqlDB 0.15.

    Now that your data is in Avro you can create your sink connector using the appropriate converters

    create sink connector test with (
      'connector.class'                     = 'io.confluent.connect.jdbc.JdbcSinkConnector',
      'tasks.max'                           = 1,
      'key.converter.schema.registry.url'   = 'http://schema-registry:8081',
      'value.converter.schema.registry.url' = 'http://schema-registry:8081',
      'key.converter'                       = 'io.confluent.connect.avro.AvroConverter',
      'value.converter'                     = 'io.confluent.connect.avro.AvroConverter',
      'key.converter.schemas.enable'        = 'false',
      'value.converter.schemas.enable'      = 'true',
      'config.action.reload'                = 'restart',
      'errors.log.enable'                   = 'true',
      'errors.log.include.messages'         = 'true',
      'print.key'                           = 'true',
      'errors.tolerance'                    = 'all',
      'topics'                              = 'my-topic',
      'connection.url'                      = 'jdbc:mysql://mariadb-docker-container:3306/ksql?autoReconnect=true&useSSL=false',
      'connection.user'                     = 'root',
      'connection.password'                 = 'strongest-password-you-have-ever-seen',
      'pk.fields'                           = 'a, b, c',
      'pk.mode'                             = 'record_key',
      'delete.enabled'                      = 'false');
    

    The two problems that you were encountering:

    1. Using StringConverter means that there's no schema present, and thus the report in the error message String value and string value schema..
    2. The key (GROUP BY) of a table is written to the key of the Kafka message, and so it is from here (record_key) that you should get the pk.fields.

    Ref: