Search code examples
apache-kafkaapache-kafka-connectdebezium

How to configure a pre-created Kafka topic as a sink in kafka-connect?


I am trying to sink a postgreql DB (which is the source) events logs to a kafka confluent cloud already created and configured topic.

FYI, I am using quay.io/debezium/connect:1.9 docker image as my kafka connect server, an aws RDS postgresql database as a source and a kafka confluent cloud topic as a sink.

I know that there is parameters like CONNECT_TOPIC_CREATION_ENABLE=false = topic.creation.enable: false in properties that could prevent the auto creation of kafka topic (since its not allowed in my confluent cloud broker) but where do I specify the topic destination name?


Solution

  • For Debezium, the topic is determined from a concatenation of the server, database schema, and table name.

    For example, suppose that fulfillment is the logical server name in the configuration for a connector that is capturing changes in a PostgreSQL installation that has a postgres database and an inventory schema that contains four tables: products, products_on_hand, customers, and orders. The connector would stream records to these four Kafka topics:

    • fulfillment.inventory.products
    • fulfillment.inventory.products_on_hand
    • fulfillment.inventory.customers
    • fulfillment.inventory.orders

    https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-topic-names

    To further manipulate the topic name from the Connector, you can use RegexRouter transform. For example, if you wanted to remove the logical server name "prefix" from each topic, or only extract the table name directly. You can also completely override the topic name with a static value, as you are asking for.


    CONNECT_TOPIC_CREATION_ENABLE is not directly related to Debezium. It is a general property for Kafka Connect.

    Whether to allow automatic creation of topics used by source connectors, when source connectors are configured with topic.creation. properties

    https://kafka.apache.org/documentation/#connectconfigs_topic.creation.enable