Search code examples
apache-kafkaapache-kafka-streamsksqldb

Manipulate kafka stream from a topic


I have a postgres table and connected it to Kafk using debezium connector. Now I want to edit the message (adding one custom column) in the kafka topic (postgres table data) and convert it as a stream to create a KSQLDB table.

I do not want to write code. I need to achieve it within KSQLDB. Help me to achieve it. Any blogs or ideas would be appreciated.


Solution

  • Finally I added the custom column using the transform also I need this custom column to be present in the key of the kafka topic that was also achieved by this transform. So my source is below, may be helpful for someone,

    {
      "name": "ksqldb-connector-kafkaetl",
      "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "plugin.name": "pgoutput",
        "database.hostname": "ipaddress",
        "database.port": "5432",
        "database.user": "user",
        "database.password": "pwd",
        "database.dbname": "practice_1_kafkaetl",
        "database.server.name": "postgres",
        "topic.prefix":"etlsource.prc1",
        "table.include.list": "dbo.table1",
        "column.include.list":"dbo.table1.col1,dbo.table1.col2,dbo.table1.col3,dbo.table1.col4",
        "slot.name" : "slot_batch_work_items",
        "transforms": "unwrap,InsertSource",
        "transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstones":"false",
        "transforms.InsertSource.type":"org.apache.kafka.connect.transforms.InsertField$Key",
        "transforms.InsertSource.static.field":"pactice_id",
        "transforms.InsertSource.static.value":"1"
      }
    }