Search code examples
apache-kafkakafka-consumer-apiapache-kafka-connectdatabase-replication

Error: PK mode for table 'person' is RECORD_KEY, but record key schema is missing


Source : Oracle Database

Target : Postgres

Replication of data using Kafka.

When I do an insert, it works fine. When I update a record on the source, it puts a new entry on the target instead of updating the already existing information. When I delete a record in the table on the source (oracle), I don't see the record getting deleted on the target

Issue : Insert works fine, but update and delete are not working. Please let me know what should I do to fix the issue ? What am I missing here?

Source connector:

{
    "name": "source",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "connection.url": "jdbc:oracle:thin:@192.168.91.139:1521/orcl1",
        "connection.user": "sys as sysdba",
        "connection.password": "oracle",
        "topic.prefix": "person",
        "mode": "incrementing",
        "poll.interval.ms": "1000",
        "incrementing.column.name":"ID",
        "query": "SELECT * from person",
        "numeric.mapping":"none",
        "include.schema.changes": "true",
        "validate.non.null": "false",
        "value.converter.schemas.enable": "true",
       "key.converter":"io.confluent.connect.avro.AvroConverter",
       "key.converter.schema.registry.url":"http://localhost:8081",
       "value.converter":"io.confluent.connect.avro.AvroConverter",
       "value.converter.schema.registry.url":"http://localhost:8081"

    }
}

Sink connector:

  {
    "name": "jdbc-sink",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "topics":"person",
        "connection.url": "jdbc:postgresql://192.168.91.229:5432/postgres?user=postgres&password=postgres ",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstones": "false",
        "auto.create": "true",
        "insert.mode": "upsert",
        "pk.mode": "record_key",
        "delete.enabled": "true",
       "value.converter.schemas.enable": "true",
       "key.converter":"io.confluent.connect.avro.AvroConverter",
       "key.converter.schema.registry.url":"http://localhost:8081",
       "value.converter":"io.confluent.connect.avro.AvroConverter",
       "value.converter.schema.registry.url":"http://localhost:8081"
    }
}

Solution

  • There are couple things I can think of, would justify the delete/update behavior.

    you shouldn't have the transforms, no need for it. for the key.converter it should be StringConverter instead of AvroConverter, also you need to define the pk field by setting pk.fields

    Eventually your sink connector could be something similar to:

            "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
            "tasks.max": "1",
            "topics":"person",
            "connection.url": "jdbc:postgresql://192.168.91.229:5432/postgres?user=postgres&password=postgres ",
            "auto.create": "true",
            "insert.mode": "upsert",
            "pk.mode": "record_key",
            "pk.fields": "ID",
            "delete.enabled": "true",
      "key.converter":"org.apache.kafka.connect.storage.StringConverter",
            "value.converter":"io.confluent.connect.avro.AvroConverter",
            "value.converter.schema.registry.url":"http://localhost:8081"
    

    Please check this to the point article by rmoffat