Search code examples
postgresqljdbcapache-kafkaapache-kafka-connect

JDBC Sink - Value schema must be of type Struct


I am trying to sink data from Kafka to Postgres and the following is what I have configured -

My zookeeper, Kafka, Schema registry & Kafka Connect is working fine Even i have configured a Mongo Sink Connector which works fine.

The issue is only with Postgres connector

Kafka topic name - data-test-topic Following is the data on this topic -

Key: "A"
{
    "name": "abhishek",
    "lastname": "Rathore"
}

Pictorial representation - image Following is the config i am using for JDBC Sink connector -

{
    "name": "postgres-jdbc-sink-connector",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": 1,
        "topics": "data-test-topic",
        "connection.url": "jdbc:postgresql://postgresdb:5432/garuna-rgs",
        "connection.user": "postgres",
        "connection.password": "arathore",
        "auto.create": true,
        "auto.evolve": true,
        "insert.mode": "upsert",
        "table.name.format": "init",
        "pk.mode": "record_key",
        "pk.fields": "name",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter.schemas.enable": false,
        "key.converter.schemas.enable": false
    }
}

Postgres table init struct -

  Column  | Type | Collation | Nullable | Default | Storage  | Compression | Stats target | Description 
----------+------+-----------+----------+---------+----------+-------------+--------------+-------------
 name     | text |           | not null |         | extended |             |              | 
 lastname | text |           | not null |         | extended |             |              | 
 key      | text |           | not null |         | extended |             |              | 
Access method: heap

The Error which JDBC sink connector produces -

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:618)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
    at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.ConnectException: Value schema must be of type Struct
    at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:86)
    at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:67)
    at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:115)
    at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:74)
    at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:90)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587)
    ... 11 more

I am not able to solve this problem even after trying different config values for sink connectors, please help


Solution

  • So found the following solution as suggested by @onecricketeer

    topic data -

    {
            schema: {
                type: 'struct',
                fields: [
                    {
                        field: 'name',
                        type: 'string',
                        optional: false,
                    },
                    {
                        field: 'lastname',
                        type: 'string',
                        optional: false,
                    },
                ],
            },
            payload: {
                name: 'abhishek',
                lastname: 'rathore',
            },
        }
    

    Sink connector config -

    {
        "name": "postgres-jdbc-sink-connector",
        "config": {
            "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
            "tasks.max": 1,
            "topics": "data-test-topic",
            "connection.url": "jdbc:postgresql://postgresdb:5432/garuna-rgs",
            "connection.user": "postgres",
            "connection.password": "arathore",
            "auto.create": true,
            "auto.evolve": true,
            "insert.mode": "upsert",
            "table.name.format": "init",
            "pk.mode": "record_value",
            "pk.fields": "name",
            "value.converter": "org.apache.kafka.connect.json.JsonConverter",
            "key.converter": "org.apache.kafka.connect.storage.StringConverter",
            "value.converter.schemas.enable": true,
            "key.converter.schemas.enable": false
        }
    }
    

    Now the data is syncing properly with Postgres