I am trying to sink data from Kafka to Postgres and the following is what I have configured -
My zookeeper, Kafka, Schema registry & Kafka Connect is working fine Even i have configured a Mongo Sink Connector which works fine.
The issue is only with Postgres connector
Kafka topic name - data-test-topic
Following is the data on this topic -
Key: "A"
{
"name": "abhishek",
"lastname": "Rathore"
}
Pictorial representation -
Following is the config i am using for JDBC Sink connector -
{
"name": "postgres-jdbc-sink-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": 1,
"topics": "data-test-topic",
"connection.url": "jdbc:postgresql://postgresdb:5432/garuna-rgs",
"connection.user": "postgres",
"connection.password": "arathore",
"auto.create": true,
"auto.evolve": true,
"insert.mode": "upsert",
"table.name.format": "init",
"pk.mode": "record_key",
"pk.fields": "name",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter.schemas.enable": false,
"key.converter.schemas.enable": false
}
}
Postgres table init struct -
Column | Type | Collation | Nullable | Default | Storage | Compression | Stats target | Description
----------+------+-----------+----------+---------+----------+-------------+--------------+-------------
name | text | | not null | | extended | | |
lastname | text | | not null | | extended | | |
key | text | | not null | | extended | | |
Access method: heap
The Error which JDBC sink connector produces -
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:618)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.ConnectException: Value schema must be of type Struct
at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:86)
at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:67)
at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:115)
at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:74)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:90)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587)
... 11 more
I am not able to solve this problem even after trying different config values for sink connectors, please help
So found the following solution as suggested by @onecricketeer
topic data -
{
schema: {
type: 'struct',
fields: [
{
field: 'name',
type: 'string',
optional: false,
},
{
field: 'lastname',
type: 'string',
optional: false,
},
],
},
payload: {
name: 'abhishek',
lastname: 'rathore',
},
}
Sink connector config -
{
"name": "postgres-jdbc-sink-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": 1,
"topics": "data-test-topic",
"connection.url": "jdbc:postgresql://postgresdb:5432/garuna-rgs",
"connection.user": "postgres",
"connection.password": "arathore",
"auto.create": true,
"auto.evolve": true,
"insert.mode": "upsert",
"table.name.format": "init",
"pk.mode": "record_value",
"pk.fields": "name",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter.schemas.enable": true,
"key.converter.schemas.enable": false
}
}
Now the data is syncing properly with Postgres