Search code examples
sql-serverdebezium

Additional unique index referencing columns not exposed by CDC causes exception


I am using the SQL connector to capture CDC on a table that we only expose a subset of all columns on the table. The table has two unique indexes A & B on it. Neither index is marked as the PRIMARY INDEX but index A is logically the primary key in our product and what I want to use with the connector. Index B references a column we don't expose to CDC. Index B isn't truly used in our product as a unique key for the table and it is only marked UNIQUE as it is known to be unique and marking it gives us a performance benefit.

This seems to be resulting in the error below. I've tried using the message.key.columns option on the connector to specify index A as the key for this table and hopefully ignore index B. However, the connector seems to still want to do something with index B

  1. How can I work around this situation?
  2. For my own understanding, why does the connector care about indexes that reference columns not exposed by CDC?
  3. For my own understanding, why does the connector care about any index besides what is configured on the CDC table i.e. see CDC.change_tables.index_name documentation
org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
    at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42)
    at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:290)
    at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:152)
    at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:119)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.IllegalArgumentException: The column "mynoncdccolumn" is referenced as PRIMARY KEY, but a matching column is not defined in table "mydatabase.myschema.mytable"!
    at io.debezium.relational.TableEditorImpl.lambda$updatePrimaryKeys$0(TableEditorImpl.java:105)
    at java.base/java.util.ArrayList.removeIf(ArrayList.java:1702)
    at java.base/java.util.ArrayList.removeIf(ArrayList.java:1690)
    at io.debezium.relational.TableEditorImpl.updatePrimaryKeys(TableEditorImpl.java:101)
    at io.debezium.relational.TableEditorImpl.create(TableEditorImpl.java:254)
    at io.debezium.connector.sqlserver.SqlServerConnection.getTableSchemaFromTable(SqlServerConnection.java:428)
    at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.getCdcTablesToQuery(SqlServerStreamingChangeEventSource.java:378)
    at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:121)

My connector configuration

{
  "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
  "database.user": "myuser",
  "database.password": "mspassword",
  "database.dbname": "mydb",
  "database.hostname": "mysqlserverinstance",
  "database.history.kafka.bootstrap.servers": "b-1.mycluster:9092,b-2.mycluster:9092,b-3.mycluster:9092",
  "database.history.kafka.topic": "myhistorytopic",
  "database.server.name": "myserver",
  "message.key.columns": "myschema.mytable:KeyColumn1,KeyColumn2;"
}

The table definition

CREATE TABLE [myschema].[mytable]
(
[MyKeyColumn1] [int] NOT NULL,
[MyKeyColumn2] [int] NOT NULL,
[Data] [varchar] (255) NOT NULL,
[UniqueColumn1] [timestamp] NOT NULL
)
GO

CREATE UNIQUE NONCLUSTERED INDEX [IndexB] ON [myschema].[mytable] ([UniqueColumn1])
GO
CREATE UNIQUE NONCLUSTERED INDEX [IndexA] ON [myschema].[mytable] ([MyKeyColumn1], [MyKeyColumn2])
GO

Solution

  • One of the contributors to Debezium seems to affirm this is a product bug https://gitter.im/debezium/user?at=60b8e96778e1d6477d7f40b5. I have created an issue https://issues.redhat.com/browse/DBZ-3597.

    Edit:

    A PR was published and approved to fix the issue. The fix is in the current 1.6 beta snapshot build.

    There is a possible workaround. The names of indices are the key to the problem. It seems they are processed in alphabetical order. Only the first one is taken into consideration so if you can rename your indices to have the one with keys first then you should get unblocked.