Search code examples
mysqlsql-serverapache-kafkadebeziummssql-jdbc

ConnectException: java.nio.file.AccessDeniedException: /tmp/connect.offsets


I am fetching data from mysql database to kafka using debezium. Then from kafka using jdbc connector to mssql.

my jdbc-sinc-connector config:

name = mssql-sink-connector_test
connector.class = io.confluent.connect.jdbc.JdbcSinkConnector
topics = test
connection.attempts = 3
connection.backoff.ms = 10000
connection.url = jdbc:sqlserver://****;databaseName=****;
connection.user = ******
connection.password = ******
dialect.name = SqlServerDatabaseDialect
table.name.format = process
pk.mode = record_key
pk.fields = id
insert.mode = upsert
delete.enabled = true
transforms = unwrap
transforms.unwrap.type = io.debezium.transforms.ExtractNewRecordState
auto.create = true
auto.evolve = true

`delete.enabled=true` not deleting the record in MySQL through JDBC sink connector - This solution didn't help. No point in increasing delete.retention.ms=86400000 as default value is high enough.

In my topics configuration files:

(./kafka-configs.sh --bootstrap-server ****:** --entity-type topics --entity-name test --describe --all)

delete.retention.ms=86400000 sensitive=false synonyms=DEFAULT_CONFIG:log.cleaner.delete.retention.ms=86400000}

From kafka logs when deleting a row in MYSQL table:

 "payload": {
    "before": {
      "id": 91501421,
      "type_id": 11156,
      "priority": 0,
      "status_id": 18,
      "status_dt": 1659112860000,
      "status_user_id": 7711,
      "description": "",
      "create_dt": 1659112860000,
      "create_user_id": 7711,
      "close_dt": null,
      "close_user_id": 0,
      "groups": "",
      "executors": "",
      "title": "",
      "create_uid": 0
    },
    "after": null,
    "source": {
.
.
.
.

Also upon launching Kafka I start getting exceptions like these, but they don't seem to affect inserting and updating data in MSSQL table, everything there seems to be working fine. Also the user that Kafka is launched from as well as Kafka itself does have correct access rights to the file.

INFO [erp-connector|task-0|offsets] WorkerSourceTask{id=erp-connector-0} Either no records were produced by the task since the last offset commit, or every record has been filtered out by a transformation or dropped due to transformation or conversion errors. (org.apache.kafka.connect.runtime.WorkerSourceTask:484)
[2022-07-29 17:36:53,872] ERROR [erp-connector|task-0|offsets] WorkerSourceTask{id=erp-connector-0} Flush of offsets threw an unexpected exception:  (org.apache.kafka.connect.runtime.WorkerSourceTask:554)
java.util.concurrent.ExecutionException: org.apache.kafka.connect.errors.ConnectException: java.nio.file.AccessDeniedException: /tmp/connect.offsets
.
.
.
Caused by: org.apache.kafka.connect.errors.ConnectException: java.nio.file.AccessDeniedException: /tmp/connect.offsets
.
.
.
 ERROR [erp-connector|task-0|offsets] WorkerSourceTask{id=erp-connector-0} Failed to commit offsets (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:113)

UPD: Also, using SQL Profiler, I caught a request to change data:

merge into <table_name> with (HOLDLOCK) AS target
    using (select @P0 as "id") as incoming
    on (target."id"=incoming."id")
    when matched then update set ......
    when not matched then insert .........

It looks like "when not matched by source delete ...... " is missing, and it seems like the delete.enabled = true setting in the jdbc-sink-connector configuration is not taken into account in any way (Judging by the merge query, only upsert mode is used).

One more observation: if you write topic messages to a file, you can see the tombstone message. But when viewing messages in a topic without writing to a file, I couldn't find the tombstone message with grep ..ID.. Presumably, tombstone messages are deleted from the Kafka topic before the consumer processes them, as far as I understand, this is affected by delete.retention.ms, but I already have a big value there.


Solution

  • The problem was in the assembly of the debezium connector. I changed:

    transforms = unwrap
    transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
    

    to:

    transforms = unwrap
    transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
    transforms.unwrap.drop.tombstones=false
    

    Thank you OneCricketeer