Source : Oracle Database
Target : Postgres
Replication of data using Kafka.
When I do an insert, it works fine. When I update a record on the source, it puts a new entry on the target instead of updating the already existing information. When I delete a record in the table on the source (oracle), I don't see the record getting deleted on the target
Issue : Insert works fine, but update and delete are not working. Please let me know what should I do to fix the issue ? What am I missing here?
Source connector:
{
"name": "source",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:oracle:thin:@192.168.91.139:1521/orcl1",
"connection.user": "sys as sysdba",
"connection.password": "oracle",
"topic.prefix": "person",
"mode": "incrementing",
"poll.interval.ms": "1000",
"incrementing.column.name":"ID",
"query": "SELECT * from person",
"numeric.mapping":"none",
"include.schema.changes": "true",
"validate.non.null": "false",
"value.converter.schemas.enable": "true",
"key.converter":"io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url":"http://localhost:8081",
"value.converter":"io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"http://localhost:8081"
}
}
Sink connector:
{
"name": "jdbc-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics":"person",
"connection.url": "jdbc:postgresql://192.168.91.229:5432/postgres?user=postgres&password=postgres ",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"auto.create": "true",
"insert.mode": "upsert",
"pk.mode": "record_key",
"delete.enabled": "true",
"value.converter.schemas.enable": "true",
"key.converter":"io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url":"http://localhost:8081",
"value.converter":"io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"http://localhost:8081"
}
}
There are couple things I can think of, would justify the delete/update behavior.
you shouldn't have the transforms
, no need for it.
for the key.converter
it should be StringConverter
instead of AvroConverter
, also you need to define the pk field by setting pk.fields
Eventually your sink connector could be something similar to:
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics":"person",
"connection.url": "jdbc:postgresql://192.168.91.229:5432/postgres?user=postgres&password=postgres ",
"auto.create": "true",
"insert.mode": "upsert",
"pk.mode": "record_key",
"pk.fields": "ID",
"delete.enabled": "true",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter":"io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"http://localhost:8081"
Please check this to the point article by rmoffat