Search code examples
apache-kafkaapache-kafka-connectkafka-producer-api

Why record cannot get deleted using JDBC Sink Connector in Kafka Connect


my sink properties :

{
  "name": "jdbc-oracle",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "topics": "orders",
    "connection.url": "jdbc:oracle:thin:@10.1.2.3:1071/orac",
    "connection.user": "ersin",
    "connection.password": "ersin!",
    "auto.create": "true",
    "delete.enabled": "true",
    "pk.mode": "record_key",
    "pk.fields": "id",
    "insert.mode": "upsert",
    "plugin.path": "/home/ersin/confluent-5.4.1/share/java/",
    "name": "jdbc-oracle"
  },
  "tasks": [
    {
      "connector": "jdbc-oracle",
      "task": 0
    }
  ],
  "type": "sink"
}

my connect-avro-distributed.properties :

bootstrap.servers=10.0.0.0:9092

group.id=connect-cluster

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://10.0.0.0:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://10.0.0.0:8081

config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-statuses

config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

I send data like this:

./bin/kafka-avro-console-producer \
  --broker-list 10.0.0.0:9092 --topic orders \
  --property parse.key="true" \
  --property key.schema='{"type":"record","name":"key_schema","fields":[{"name":"id","type":"int"}]}' \
  --property key.separator="$" \
  --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":["null","int"],"default": null},{"name":"product","type": ["null","string"],"default": null}, {"name":"quantity", "type":  ["null","int"],"default": null}, {"name":"price","type":  ["null","int"],"default": null}]}' \
  --property schema.registry.url=http://10.0.0.0:8081

I can insert or update data on oracle like this

{"id":3}${"id": {"int":2}, "product": {"string":"Yağız Gülbahar"}, "quantity": {"int":1071}, "price": {"int":1453}}

but when i put this to delete record on oracle it cant delete data just update columns as null

{"id":2}${"id": null, "product": null , "quantity": null , "price": null }

how can I solve this?

thanks in advance


Solution

  • Actually you need to produce a Tombstone Record. In Kafka deletion using JDBC Sink Connector works as follows:

    The connector can delete rows in a database table when it consumes a tombstone record, which is a Kafka record that has a non-null key and a null value. This behavior is disabled by default, meaning that any tombstone records will result in a failure of the connector, making it easy to upgrade the JDBC connector and keep prior behavior.

    Deletes can be enabled with delete.enabled=true, but only when the pk.mode is set to record_key. This is because deleting a row from the table requires the primary key be used as criteria.

    Enabling delete mode does not affect the insert.mode.

    Also note that this kind of record will only be deleted only after delete.retention.ms milliseconds which currently defaults to 24 hours.


    Therefore, try to reduce this configuration in your properties and see if it works. To do so, you need to run the following command:

     ./bin/kafka-topics.sh \
        --alter \
        --zookeeper localhost:2181 \
        --topic orders \
        --config retention.ms=100 
    

    Now once the configuration is done, all you need to do is to generate a message with non-null key and a null value. Note that the Kafka Console Consumer cannot be used in order to produce a null record as the user input is parsed as UTF-8. Therefore,

    {"id":2}${"id": null, "product": null , "quantity": null , "price": null }
    

    is not an actual tombstone message.

    You can however make use of kafkacat but it does work only for JSON messages:

    # Produce a tombstone (a "delete" for compacted topics) for key 
    # "abc" by providing an empty message value which -Z interpretes as NULL:
    
    echo "abc:" | kafkacat -b mybroker -t mytopic -Z -K:
    

    but in your case this wouldn't work as you need to send Avro messages. Therefore, I would suggest to write a very simple Avro Producer in your favourite language so that you can actually send a tombstone message.