I am using Debezium source connector (postgres) to track database changes to kafka and I am using kafka jdbc sink connector to transfer the data to another postgres server. Here insert and update are working fine. The problem is with delete. Whenever the delete occurs in the source database debezium sending a tombstone message. But jdbc sink connector trying to insert the row into the destination database and fails. Please help me where am I going wrong?
{
"name": "ksqldb-connector-actions",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"plugin.name": "pgoutput",
"database.hostname": "ipadress",
"database.port": "5432",
"database.user": "db",
"database.password": "*********",
"database.dbname": "config",
"database.server.name": "postgres",
"topic.prefix":"kcon",
"table.include.list": "dbo.actions",
"slot.name" : "slot_actions_connector",
"transforms":"unwrap",
"transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones":"false",
"transforms.unwrap.delete.handling.mode":"rewrite",
"transforms.unwrap.add.fields":"table,lsn"
}
}
For transforms.unwrap.delete.handling.mode
I tried "rewrite"
as well as "drop"
but both are failing on delete
{
"name": "jdbc-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "kcon.dbo.actions",
"connection.url": "jdbc:postgresql://ipadress:5432/config",
"connection.user": "wft",
"connection.password": "*******",
"insert.mode": "upsert",
"delete.enabled": "true",
"table.name.format":"dbo.actions_etl_kafka",
"pk.mode":"record_key",
"pk.fields": "action_id",
"db.timezone":"Asia/Kolkata",
"auto.create":"true",
"auto.evolve":"true",
"errors.tolerance": "all",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"transforms": "flatten",
"transforms.flatten.type": "org.apache.kafka.connect.transforms.Flatten$Key",
"transforms.flatten.delimiter": "_",
"input.data.format": "AVRO",
"key.converter":"io.confluent.connect.avro.AvroConverter",
"value.converter":"io.confluent.connect.avro.AvroConverter",
"key.converter.schemas.enable":"true",
"value.converter.schemas.enable": "true",
"key.converter.schema.registry.url":"http://schema-registry-ksql:8081",
"value.converter.schema.registry.url":"http://schema-registry-ksql:8081"
}
}
Actually the problem was the kafka connect version which is unable to handle the Tombstone message so all the time delete failed. I was using confluentinc/cp-kafka-connect:5.2.1.
Now I created a custom image with the latest version and the delete works fine. The custom image creation is below. May be helpful to someone.
FROM confluentinc/cp-kafka-connect:6.1.9
ENV CONNECT_PLUGIN_PATH=/usr/share/java/,/usr/share/confluent-hub-components/
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:10.5.2
RUN confluent-hub install --no-prompt debezium/debezium-connector-postgresql:1.9.3
RUN confluent-hub install --no-prompt jcustenborder/kafka-connect-transform-common:0.1.0.54