I have this connector and sink which basically creates a topic with "Test.dbo.TEST_A" and write to the ES index "Test". I have set the "key.ignore": "false" so that row updates are also updated in ES and "transforms.unwrap.add.fields":"table" to keep track on which table the document belong to.
{
"name": "Test-connector",
"config": {
"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
"tasks.max": "1",
"database.hostname": "192.168.1.234",
"database.port": "1433",
"database.user": "user",
"database.password": "pass",
"database.dbname": "Test",
"database.server.name": "MyServer",
"table.include.list": "dbo.TEST_A",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "dbhistory.testA",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.unwrap.add.fields":"table"
}
}
{
"name": "elastic-sink-test",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "TEST_A",
"connection.url": "http://localhost:9200/",
"string.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schema.enable": "false",
"schema.ignore": "true",
"transforms": "topicRoute,unwrap,key",
"transforms.topicRoute.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.topicRoute.regex": "(.*).dbo.TEST_A", /* Use the database name */
"transforms.topicRoute.replacement": "$1",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
"transforms.unwrap.drop.tombstones": "false",
"transforms.key.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.key.field": "Id",
"key.ignore": "false",
"type.name": "TEST_A",
"behavior.on.null.values": "delete"
}
}
But when I add another connector/sink to include another table "TEST_B" from the database. It seems like whenever the id from TEST_A and TEST_B are the same one of the row is deleted from ES?
Is it possible with this setup to have one index = one dabase or is the only solution to have one index per table? The reason I want to have one index = one dabase is to decrease the amount of indexes when more database are added to ES.
You are reading data changes from different Databases/Tables and writing them into the same ElasticSearch index, with the ES document ID set to the DB record ID. And as you can see, if the DB record IDs collide, the index document IDs will also collide, causing old documents to be deleted.
You have a few options here: