I'm using debezium SQL Server to track changes on a production base. The topic is created, CDC is working like a charm, but when trying to use jdbcSinkConnector to dump the data in another Sql Server DB, I'm encountering the following error.
com.microsoft.sqlserver.jdbc.SQLServerException: One or more values is out of range of values for the datetime2 SQL Server data type
On the source database the sql datatype is timestamp2(7)
.
The kafka event is 1549461754650000000.
The schema type is INT64.
The schema name io.debezium.time.NanoTimestamp.
I can't find a way to tell the TimestampConverter that is value isn't expressed in millis, or micros, but nanoseconds (would not work with microseconds anyway).
here is my connector configuration
{
"name": "cdc.swip.bi.ods.sink.contract",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "swip.swip_core.contract",
"connection.url": "jdbc:sqlserver://someip:1234;database=DB",
"connection.user": "loloolololo",
"connection.password": "muahahahahaha",
"dialect.name": "SqlServerDatabaseDialect",
"auto.create": "false",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schemas.enable": "true",
"key.converter.schema.registry.url": "http://localhost:8081",
"value.converter.schemas.enable": "true",
"value.converter.schema.registry.url": "http://localhost:8081",
"transforms": "unwrap,created_date,modified_date",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
"transforms.created_date.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.created_date.target.type": "Timestamp",
"transforms.created_date.field": "created_date",
"transforms.modified_date.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.modified_date.target.type": "Timestamp",
"transforms.modified_date.field": "modified_date",
"insert.mode": "insert",
"delete.enabled": "false",
"pk.fields": "id",
"pk.mode": "record_value",
"schema.registry.url": "http://localhost:8081",
"table.name.format": "ODS.swip.contract"
}
}
I forgort to post the answer.
The property "time.precision.mode":"connect"
does the trick
{
"name":"debezium-connector-sqlserver",
"config": {
"connector.class":"io.debezium.connector.sqlserver.SqlServerConnector",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable":"true",
"value.converter.schemas.enable":"true",
"database.hostname":"someHost",
"database.port":"somePort",
"database.user":"someUser",
"database.password":"somePassword",
"database.dbname":"someDb",
"database.server.name":"xxx.xxx",
"database.history.kafka.topic":"xxx.xxx.history",
"time.precision.mode":"connect",
"database.history.kafka.bootstrap.servers":"example.com:9092"
}
}