Search code examples
databaseapache-kafkapsqldebezium

Getting operation performed data from Debezium


I am trying to stream the changes made in my Postgresql DB using Debezium and Kafka. This is my connector configuration(pic 3)

The connector is created and I am able to consume the changes in Jupyter notebook using my Kafka consumer. But I am only getting the row data that was inserted or updated(pic 2). I am not able to get the operation that was performed(whether it was INSERT or UPDATE). Is there a way to get the operation performed data as well? also is it possible to get other operations data like TRUNCATE,DROP,ALTER because WAL logs store these information(pic 1- you can see the delete operation being logged but this doesnt get consumed by kafka).

(https://i.sstatic.net/f7t2g.png)(https://i.sstatic.net/pyh8y.png)(https://i.sstatic.net/59mTA.png)


Solution

  • In the config, please add the following line. This will provide the operation, the table, sequence number, and timestamp of this operation.

    As for the deletes , please look at additional configs (transforms.unwrap.delete.handling.mode=rewrite)

    IIRC, for deletes, the framework can publish the Key that it deleted and the value as a null (tombstone message)

    Reference - Doc

    transforms.unwrap.add.fields=op,table,lsn,source.ts_ms