Search code examples
apache-kafkaapache-kafka-connectdebeziumcdc

Confluent JDBC Sink Connector Can't recognize record captured by debezium connector


I try to cdc postgresql with kafka echo system. The debezium connector captures postgresql 13 data and produce then to kafka. And I attach JDBC sink connector of confluent to other side of kafka, but it does not recognize json message properly.

Tested environment:

  • Postgresql 13, WAL logical (docker)
  • Debezium 1.9 postgresql connector (pre-built jar)
  • confluentinc/kafka-connector-jdbc in github.
  • Kafka 2.8
  • java 17

Tested table.

create table PERSONS (id integer not null, name varchar(50) not null, nickname varchar(50), primary key(id));

Debezium produced json message on update event. (captured by console-consumer)

{

"schema":{
    "type":"struct",
    "fields":[
        {
            "type":"struct",
            "fields":[
                {
                    "type":"int32",
                    "optional":false,
                    "field":"id"
                },
                {
                    "type":"string",
                    "optional":false,
                    "field":"name"
                },
                {
                    "type":"string",
                    "optional":true,
                    "field":"nickname"
                }
            ],
            "optional":true,
            "name":"localdb_postgres.public.persons.Value",
            "field":"before"
        },
        {
            "type":"struct",
            "fields":[
                {
                    "type":"int32",
                    "optional":false,
                    "field":"id"
                },
                {
                    "type":"string",
                    "optional":false,
                    "field":"name"
                },
                {
                    "type":"string",
                    "optional":true,
                    "field":"nickname"
                }
            ],
            "optional":true,
            "name":"localdb_postgres.public.persons.Value",
            "field":"after"
        },
        {
            "type":"struct",
            "fields":[
                {
                    "type":"string",
                    "optional":false,
                    "field":"version"
                },
                {
                    "type":"string",
                    "optional":false,
                    "field":"connector"
                },
                {
                    "type":"string",
                    "optional":false,
                    "field":"name"
                },
                {
                    "type":"int64",
                    "optional":false,
                    "field":"ts_ms"
                },
                {
                    "type":"string",
                    "optional":true,
                    "name":"io.debezium.data.Enum",
                    "version":1,
                    "parameters":{
                        "allowed":"true,last,false,incremental"
                    },
                    "default":"false",
                    "field":"snapshot"
                },
                {
                    "type":"string",
                    "optional":false,
                    "field":"db"
                },
                {
                    "type":"string",
                    "optional":true,
                    "field":"sequence"
                },
                {
                    "type":"string",
                    "optional":false,
                    "field":"schema"
                },
                {
                    "type":"string",
                    "optional":false,
                    "field":"table"
                },
                {
                    "type":"int64",
                    "optional":true,
                    "field":"txId"
                },
                {
                    "type":"int64",
                    "optional":true,
                    "field":"lsn"
                },
                {
                    "type":"int64",
                    "optional":true,
                    "field":"xmin"
                }
            ],
            "optional":false,
            "name":"io.debezium.connector.postgresql.Source",
            "field":"source"
        },
        {
            "type":"string",
            "optional":false,
            "field":"op"
        },
        {
            "type":"int64",
            "optional":true,
            "field":"ts_ms"
        },
        {
            "type":"struct",
            "fields":[
                {
                    "type":"string",
                    "optional":false,
                    "field":"id"
                },
                {
                    "type":"int64",
                    "optional":false,
                    "field":"total_order"
                },
                {
                    "type":"int64",
                    "optional":false,
                    "field":"data_collection_order"
                }
            ],
            "optional":true,
            "field":"transaction"
        }
    ],
    "optional":false,
    "name":"localdb_postgres.public.persons.Envelope"
},
"payload":{
    "before":null,
    "after":{
        "id":3,
        "name":"Ko Youngrok",
        "nickname":"SuperMen"
    },
    "source":{
        "version":"1.9.2.Final",
        "connector":"postgresql",
        "name":"localdb_postgres",
        "ts_ms":1653878655898,
        "snapshot":"false",
        "db":"postgres",
        "sequence":"[\"23092016\",\"23092304\"]",
        "schema":"public",
        "table":"persons",
        "txId":516,
        "lsn":23092304,
        "xmin":null
    },
    "op":"u",
    "ts_ms":1653878656372,
    "transaction":null
}

The error log from Confluent jdbc sink connector.

[2022-05-30 15:45:26,750] INFO Unable to find fields [SinkRecordField{schema=Schema{io.debezium.connector.postgresql.Source:STRUCT}, name='source', isPrimaryKey=false}, SinkRecordField{schema=Schema{INT64}, name='ts_ms', isPrimaryKey=false}, SinkRecordField{schema=Schema{localdb_postgres.public.persons.Value:STRUCT}, name='after', isPrimaryKey=false}, SinkRecordField{schema=Schema{STRING}, name='op', isPrimaryKey=false}, SinkRecordField{schema=Schema{localdb_postgres.public.persons.Value:STRUCT}, name='before', isPrimaryKey=false}, SinkRecordField{schema=Schema{STRUCT}, name='transaction', isPrimaryKey=false}] among column names [nickname, id, name] (io.confluent.connect.jdbc.sink.DbStructure:276)

[2022-05-30 15:45:26,752] ERROR WorkerSinkTask{id=conflue-jdbc-sink-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Cannot ALTER TABLE "localdb_postgres"."public"."persons" to add missing field SinkRecordField{schema=Schema{io.debezium.connector.postgresql.Source:STRUCT}, name='source', isPrimaryKey=false}, as the field is not optional and does not have a default value (org.apache.kafka.connect.runtime.WorkerSinkTask:608) io.confluent.connect.jdbc.sink.TableAlterOrCreateException: Cannot ALTER TABLE "localdb_postgres"."public"."persons" to add missing field SinkRecordField{schema=Schema{io.debezium.connector.postgresql.Source:STRUCT}, name='source', isPrimaryKey=false}, as the field is not optional and does not have a default value

I suppose,

  • Debezium connector intended that purpose of top-level "schema" column is type decription of other top-level column "payload". Every matching columns exist in both schema and payload. Payload column is value and schema column is type description of payload column.
  • But confluet connector regards "schema" as table schema, so try to alter table. Of course there's no enouth information to create database column and error was raised.

Any any idea to solve this problem?

This is confluent connector configuration.

name=conflue-jdbc-sink-connector
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
connection.url=blahblah
connection.user=blahblah
connection.password=blahblah
dialect.name=PostgreSqlDatabaseDialect
insert.mode=insert
pk.mode=none
pk.fields=none
batch.size=3000
delete.enabled=false
topics=localdb_postgres.public.persons

Solution

  • Unable to find fields ... source, ts_ms, after, op, before, transaction ...   among column names [nickname, id, name]
    

    The JDBC Sink Connect infers only top level fields of the records it consumes.

    If you want to write only the after data payload, for example, you need to extract it. If you continue to use the JSONConverter, then you will still end up with schema and payload, but you will only see what's relevant to that data.

    E.g. This would be the full record after the transform.

        "schema" : {
            "type":"struct",
            "fields":[
                {
                    "type":"int32",
                    "optional":false,
                    "field":"id"
                },
                {
                    "type":"string",
                    "optional":false,
                    "field":"name"
                },
                {
                    "type":"string",
                    "optional":true,
                    "field":"nickname"
                }
            ],
            "name":"localdb_postgres.public.persons.Value"
        }, 
        "payload" : {
            "id":3,
            "name":"Ko Youngrok",
            "nickname":"SuperMen"
        }