Search code examples
postgresqlapache-kafkaapache-kafka-connectdebezium

Debezium Postgres sink connector fails to insert values with type DATE


After setting up both source and sink connectors, I get problems with DATE type Postgres columns.

ERROR: column "foo" is of type date but expression is of type integer

I checked Avro schema and see that column foo was serialized as io.debezium.time.Date

{
    "default": null,
    "name": "foo",
    "type": [
        "null",
        {
            "connect.name": "io.debezium.time.Date",
            "connect.version": 1,
            "type": "int"
        }
    ]
}

What should I do to let sink connector insert this values correctly (as DATE, not INTEGER)?

Full stacktrace:

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.ConnectException: java.sql.SQLException: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO "test_table" ("id","foo") VALUES (75046,18577) ON CONFLICT ("id") DO UPDATE SET "foo"=EXCLUDED."foo" was aborted: ERROR: column "foo" is of type date but expression is of type integer
  Hint: You will need to rewrite or cast the expression.
  Position: 249  Call getNextException to see other errors in the batch.
org.postgresql.util.PSQLException: ERROR: column "foo" is of type date but expression is of type integer
  Hint: You will need to rewrite or cast the expression.
  Position: 249
org.postgresql.util.PSQLException: ERROR: column "foo" is of type date but expression is of type integer
  Hint: You will need to rewrite or cast the expression.
  Position: 249

    at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:89)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)
    ... 10 more
Caused by: java.sql.SQLException: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO "test_table" ("id","foo") VALUES (75046,18577) ON CONFLICT ("id") DO UPDATE SET "foo"=EXCLUDED."foo" was aborted: ERROR: column "foo" is of type date but expression is of type integer
  Hint: You will need to rewrite or cast the expression.
  Position: 249  Call getNextException to see other errors in the batch.
org.postgresql.util.PSQLException: ERROR: column "foo" is of type date but expression is of type integer
  Hint: You will need to rewrite or cast the expression.
  Position: 249
org.postgresql.util.PSQLException: ERROR: column "foo" is of type date but expression is of type integer
  Hint: You will need to rewrite or cast the expression.
  Position: 249

    ... 12 more

Source config:

{
    "name": "dbz-source-test-1",
    "config": {
        "name":"dbz-source-test-1",
        "connector.class":"io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname":"some.host",
        "database.port":"5432",
        "database.user":"test_debezium",
        "database.password":"password",
        "database.dbname":"dbname",
        "plugin.name":"wal2json_rds",
        "slot.name":"wal2json_rds",
        "database.server.name":"server_test",
        "table.whitelist":"public.test_table",
        "transforms":"route",
        "transforms.route.type":"org.apache.kafka.connect.transforms.RegexRouter",
        "transforms.route.regex":"([^.]+)\\.([^.]+)\\.([^.]+)",
        "transforms.route.replacement":"dbz_source_$3",
        "topic.selection.strategy":"topic_per_table",
        "include.unknown.datatypes":true,
        "decimal.handling.mode":"double",
        "snapshot.mode":"never"
    }
}

Sink config:

{
    "name": "dbz-sink-test-1",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "config.providers" : "file",
        "config.providers.file.class" : "org.apache.kafka.common.config.provider.FileConfigProvider",
        "config.providers.file.param.secrets" : "/opt/mysecrets",
        "topics": "dbz_source_test_table",
        "connection.url": "someurl",
        "connection.user": "${file:/opt/mysecrets.properties:user}",
        "connection.password" : "${file:/opt/mysecrets.properties:pass}",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
        "table.name.format": "dbz_source_",
        "insert.mode": "upsert",
        "pk.field": "id",
        "pk.mode": "record_value"
    }
}

Solution

  • I fixed problem switching source connector time.precision.mode config to connect

    When the time.precision.mode configuration property is set to connect, then the connector will use the predefined Kafka Connect logical types. This may be useful when consumers only know about the built-in Kafka Connect logical types and are unable to handle variable-precision time values.

    After it serialization type becomes different:

    {
        "default": null,
        "name": "foo",
        "type": [
            "null",
            {
                "connect.name": "org.apache.kafka.connect.data.Date",
                "connect.version": 1,
                "logicalType": "date",
                "type": "int"
            }
        ]
    }
    

    Sink connector is aware of org.apache.kafka.connect.data.Date type and inserts in correctly.