Search code examples
jdbcapache-kafkadebeziumchange-data-capture

Debezium-JDBC connector handling operations using other columns aside from pk (id)


I created a cdc set up using debezium for source and jdbc as my sink connector which is the data is in a Postgres db. The process is working fine as expected. Now we’re facing some use case issue regarding existing data on both database but has different primary_key id. So with this we’re plannning on using an additional column (record_id on the sink db) which will be populated by the id(pk) from the source and will be the basis for updates and delete operations.

What should I add in the configuration for sink-connector to achieve this:

{
    "name": "test_table_sink-connector",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "topics": "utanga_dev.public.test_table",
        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "http://host.docker.internal:8081",
        "key.converter.schema.registry.url": "http://host.docker.internal:8081",
        "connection.url": "jdbc:postgresql://db:5432/utanga_dev?user=utanga&password=changeme",
        "key.converter.schemas.enable": "true",
        "value.converter.schemas.enable": "true",
        "auto.create": "false",
        "auto.evolve": "true",
        "insert.mode": "upsert",
        "pk.mode": "record_value",
        "pk.fields":"record_id", // currently not working
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "table.name.format": "${topic}",
        "delete.enabled":true,
        "delete.handling.mode":"none",
        "delete.tombstone.handling.mode":"drop",
        "transforms.unwrap.delete.handling.mode": "none",
        "transforms.unwrap.drop.tombstones": "false"
    }
}

Solution

  • So I ended up using transformation on my configuration making the id as record_id and excluding the id upon update and delete operations. Here is my configuration:

    {
        "name": "test_table-sink-connector",
        "config": {
            "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
            "tasks.max": "1",
            "topics": "utanga_dev.public.test_table",
            "key.converter": "io.confluent.connect.avro.AvroConverter",
            "value.converter": "io.confluent.connect.avro.AvroConverter",
            "value.converter.schema.registry.url": "http://host.docker.internal:8081",
            "key.converter.schema.registry.url": "http://host.docker.internal:8081",
            "connection.url": "jdbc:postgresql://db:5432/utanga_dev?user=utanga&password=changeme",
            "key.converter.schemas.enable": "true",
            "value.converter.schemas.enable": "true",
            "auto.create": "false",
            "auto.evolve": "true",
            "insert.mode": "upsert",
            "pk.mode": "record_key",
            "transforms": "unwrap, RenameKey, ExcludeId",
            "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
            "table.name.format": "${topic}",
            "delete.enabled":true,
            "delete.handling​.mode":"none",
            "delete.tombstone.handling.mode":"drop",
            "transforms.unwrap.delete.handling.mode": "none",
            "transforms.unwrap.drop.tombstones": "false",
            "transforms.RenameKey.type": "org.apache.kafka.connect.transforms.ReplaceField$Key", 
            "transforms.RenameKey.renames": "id:record_id",
            "transforms.ExcludeId.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
            "transforms.ExcludeId.blacklist": "id"
        }
    }

    However, for now, I am still not sure if this is the correct way or efficient way of doing so. Any suggestions will be greatly appreciated.