I have a postgres table and connected it to Kafk using debezium connector. Now I want to edit the message (adding one custom column) in the kafka topic (postgres table data) and convert it as a stream to create a KSQLDB table.
I do not want to write code. I need to achieve it within KSQLDB. Help me to achieve it. Any blogs or ideas would be appreciated.
Finally I added the custom column using the transform
also I need this custom column to be present in the key of the kafka topic that was also achieved by this transform
. So my source is below, may be helpful for someone,
{
"name": "ksqldb-connector-kafkaetl",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"plugin.name": "pgoutput",
"database.hostname": "ipaddress",
"database.port": "5432",
"database.user": "user",
"database.password": "pwd",
"database.dbname": "practice_1_kafkaetl",
"database.server.name": "postgres",
"topic.prefix":"etlsource.prc1",
"table.include.list": "dbo.table1",
"column.include.list":"dbo.table1.col1,dbo.table1.col2,dbo.table1.col3,dbo.table1.col4",
"slot.name" : "slot_batch_work_items",
"transforms": "unwrap,InsertSource",
"transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones":"false",
"transforms.InsertSource.type":"org.apache.kafka.connect.transforms.InsertField$Key",
"transforms.InsertSource.static.field":"pactice_id",
"transforms.InsertSource.static.value":"1"
}
}