I try to cdc postgresql with kafka echo system. The debezium connector captures postgresql 13 data and produce then to kafka. And I attach JDBC sink connector of confluent to other side of kafka, but it does not recognize json message properly.
Tested environment:
Tested table.
create table PERSONS (id integer not null, name varchar(50) not null, nickname varchar(50), primary key(id));
Debezium produced json message on update event. (captured by console-consumer)
{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"string",
"optional":true,
"field":"nickname"
}
],
"optional":true,
"name":"localdb_postgres.public.persons.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"string",
"optional":true,
"field":"nickname"
}
],
"optional":true,
"name":"localdb_postgres.public.persons.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false,incremental"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":true,
"field":"sequence"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"localdb_postgres.public.persons.Envelope"
},
"payload":{
"before":null,
"after":{
"id":3,
"name":"Ko Youngrok",
"nickname":"SuperMen"
},
"source":{
"version":"1.9.2.Final",
"connector":"postgresql",
"name":"localdb_postgres",
"ts_ms":1653878655898,
"snapshot":"false",
"db":"postgres",
"sequence":"[\"23092016\",\"23092304\"]",
"schema":"public",
"table":"persons",
"txId":516,
"lsn":23092304,
"xmin":null
},
"op":"u",
"ts_ms":1653878656372,
"transaction":null
}
The error log from Confluent jdbc sink connector.
[2022-05-30 15:45:26,750] INFO Unable to find fields [SinkRecordField{schema=Schema{io.debezium.connector.postgresql.Source:STRUCT}, name='source', isPrimaryKey=false}, SinkRecordField{schema=Schema{INT64}, name='ts_ms', isPrimaryKey=false}, SinkRecordField{schema=Schema{localdb_postgres.public.persons.Value:STRUCT}, name='after', isPrimaryKey=false}, SinkRecordField{schema=Schema{STRING}, name='op', isPrimaryKey=false}, SinkRecordField{schema=Schema{localdb_postgres.public.persons.Value:STRUCT}, name='before', isPrimaryKey=false}, SinkRecordField{schema=Schema{STRUCT}, name='transaction', isPrimaryKey=false}] among column names [nickname, id, name] (io.confluent.connect.jdbc.sink.DbStructure:276)
[2022-05-30 15:45:26,752] ERROR WorkerSinkTask{id=conflue-jdbc-sink-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Cannot ALTER TABLE "localdb_postgres"."public"."persons" to add missing field SinkRecordField{schema=Schema{io.debezium.connector.postgresql.Source:STRUCT}, name='source', isPrimaryKey=false}, as the field is not optional and does not have a default value (org.apache.kafka.connect.runtime.WorkerSinkTask:608) io.confluent.connect.jdbc.sink.TableAlterOrCreateException: Cannot ALTER TABLE "localdb_postgres"."public"."persons" to add missing field SinkRecordField{schema=Schema{io.debezium.connector.postgresql.Source:STRUCT}, name='source', isPrimaryKey=false}, as the field is not optional and does not have a default value
I suppose,
Any any idea to solve this problem?
This is confluent connector configuration.
name=conflue-jdbc-sink-connector
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
connection.url=blahblah
connection.user=blahblah
connection.password=blahblah
dialect.name=PostgreSqlDatabaseDialect
insert.mode=insert
pk.mode=none
pk.fields=none
batch.size=3000
delete.enabled=false
topics=localdb_postgres.public.persons
Unable to find fields ... source, ts_ms, after, op, before, transaction ... among column names [nickname, id, name]
The JDBC Sink Connect infers only top level fields of the records it consumes.
If you want to write only the after
data payload, for example, you need to extract it. If you continue to use the JSONConverter, then you will still end up with schema
and payload
, but you will only see what's relevant to that data.
E.g. This would be the full record after the transform.
"schema" : {
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"string",
"optional":true,
"field":"nickname"
}
],
"name":"localdb_postgres.public.persons.Value"
},
"payload" : {
"id":3,
"name":"Ko Youngrok",
"nickname":"SuperMen"
}