Search code examples
elasticsearchapache-kafkaapache-kafka-connectdebezium

Is it possible to have one Elasticsearch Index for one database with tables using debezium and kafka?


I have this connector and sink which basically creates a topic with "Test.dbo.TEST_A" and write to the ES index "Test". I have set the "key.ignore": "false" so that row updates are also updated in ES and "transforms.unwrap.add.fields":"table" to keep track on which table the document belong to.

{
    "name": "Test-connector", 
    "config": {
        "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector", 
        "tasks.max": "1",
        "database.hostname": "192.168.1.234", 
        "database.port": "1433", 
        "database.user": "user", 
        "database.password": "pass", 
        "database.dbname": "Test", 
        "database.server.name": "MyServer",
        "table.include.list": "dbo.TEST_A",
        "database.history.kafka.bootstrap.servers": "kafka:9092", 
        "database.history.kafka.topic": "dbhistory.testA",

        "transforms": "unwrap",

        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstones": "false",
        "transforms.unwrap.delete.handling.mode": "rewrite",
        "transforms.unwrap.add.fields":"table"
    }
}
{
    "name": "elastic-sink-test",
    "config": {
        "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
        "tasks.max": "1",
        "topics": "TEST_A",
        "connection.url": "http://localhost:9200/",
        "string.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schema.enable": "false",
        "schema.ignore": "true",

        "transforms": "topicRoute,unwrap,key",

        "transforms.topicRoute.type": "org.apache.kafka.connect.transforms.RegexRouter",
        "transforms.topicRoute.regex": "(.*).dbo.TEST_A",                          /* Use the database name */
        "transforms.topicRoute.replacement": "$1",

        "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",    
        "transforms.unwrap.drop.tombstones": "false",    

        "transforms.key.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
        "transforms.key.field": "Id",       

        "key.ignore": "false",                                                        
        "type.name": "TEST_A",
        "behavior.on.null.values": "delete"                                                     
    }
}

But when I add another connector/sink to include another table "TEST_B" from the database. It seems like whenever the id from TEST_A and TEST_B are the same one of the row is deleted from ES?

Is it possible with this setup to have one index = one dabase or is the only solution to have one index per table? The reason I want to have one index = one dabase is to decrease the amount of indexes when more database are added to ES.


Solution

  • You are reading data changes from different Databases/Tables and writing them into the same ElasticSearch index, with the ES document ID set to the DB record ID. And as you can see, if the DB record IDs collide, the index document IDs will also collide, causing old documents to be deleted.

    You have a few options here:

    • ElasticSearch index per DB/Table name: You can implement this with different connectors or with a custom Single Message Transform (SMT)
    • Globally unique DB records: If you control the schema of the source tables, you can set the primary key to a UUID. This will prevent ID collisions.
    • As you mentioned in the comments, set the ES document ID to DB/Table/ID. You can implement this change using an SMT