Search code examples
postgresqlapache-kafkadebeziumdebezium-connect

Postgres debezium connector changes are not showing up under topic messages


I am new to Kafka and Debezium. I have succesfully created a connector for my Postgresql.

Now I have been using a sample postgres image with some inventory tables with some test data.

My registered postgres debezium connector looks as following

{
    "name": "inventory-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname": "postgres",
        "database.port": "5432",
        "database.user": "postgres",
        "database.password": "postgres",
        "database.dbname": "postgres",
        "database.server.name": "dbserver1",
        "table.include.list": "inventory.customers"
    }
}

I have additionally created a new schema in my postgres database named 'membership' with a table named 'members' (with four columns: membership_id (bigint), firstname(name), lastname(name), created (date)).

Under my assumption that I can create a new connector for my new 'membership.member' table I have registered the following additional connector:

{
    "name": "membership-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname": "postgres",
        "database.port": "5432",
        "database.user": "postgres",
        "database.password": "postgres",
        "database.dbname": "postgres",
        "database.server.name": "dbserver1",
        "table.include.list": "membership.members"
    }
}

Looking in my kafka ui under topics I only see the inventory topic with the snapshot and full tracking of additional inserts and updates. However my membership topic is not showing nor is anything getting CDC tracked.

Composing my enire kafka setup down and registering my membership connector first, does make it show up under my kafka topics - and also snapshots the data. Although any further inserts are only showing changes in the identity incremental and no further data (firstname and lastname are empty). Even the created date is just a number:

"payload": {
        "before": null,
        "after": {
            "membership_id": 1002,
            "firstname": "", <--- why empty?
            "lastname": "", <--- why empty?
            "created": 19743
        },
        "source": {
            "version": "1.9.7.Final",
            "connector": "postgresql",
            "name": "dbserver1",
            "ts_ms": 1705835798528,
            "snapshot": "false",
            "db": "postgres",
            "sequence": "[\"37044096\",\"37044512\"]",
            "schema": "membership",
            "table": "members",
            "txId": 769,
            "lsn": 37044512,
            "xmin": null
        },
        "op": "c",
        "ts_ms": 1705835798622,
        "transaction": null
    }

So my question is why I am not seeing all my data in my payload for membership inserts?

I am not sure what triggered it. But it is working now (the snapshot on membership). I suppose creating a topic and registering my connector can take some time?

Although - now I have done additional inserts in my membership table, and still waiting for my kafka topic to intercept the CDC from postgres. The change messages are not yet showing :( Inserting in the inventory.customers tables are working and showing up:

enter image description here

Still waiting for my membership changes to show up here:

enter image description here


Solution

  • Slotname must be configured additionally in the connector json:

    Example:

    {
        "name": "inventory-connector",
        "config": {
            "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
            "tasks.max": "1",
            "database.hostname": "postgres",
            "database.port": "5432",
            "database.user": "postgres",
            "database.password": "postgres",
            "database.dbname" : "postgres",
            "database.server.name": "dbserver1",
            "database.whitelist": "inventory",
            "database.history.kafka.bootstrap.servers": "kafka:9092",
            "database.history.kafka.topic": "schema-changes.inventory",
            "slot.name" : "my_slot_name01"
        }
    }
    

    If your 'before' in kafka messages is null after updates. You have to fix your replica identity of your table:

    ALTER TABLE myschema.mytable REPLICA IDENTITY FULL;