Search code examples
postgresqlapache-kafkaapache-kafka-connectdebezium

Kafka Debezium Connector Working But not ingesting data into PostgreSQL


I have successfully connect postgresql to kafka using debezium-connector-postgres-1.8.0.Final-plugin connector.Below is my Standalone.properties file:

rest.port=8084
bootstrap.servers=localhost:9092

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

plugin.path=/home/abc/debezium-connector-postgres-1.8.0.Final-plugin/debezium-connector-postgres/debezium-connector-postgres-1.8.0.Final.jar,


############################# Zookeeper #############################


zookeeper.connect=localhost:2181

My connector file is given below:

name= inventory_db-connector
connector.class= io.debezium.connector.postgresql.PostgresConnector
tasks.max= 1
database.hostname= localhost
database.port= 5432
database.user= user
database.password= pass
database.dbname = testdb
database.server.name= dbserver1
database.whitelist= testdb
database.history.kafka.bootstrap.servers= kafka=9092
database.history.kafka.topic= schema-changes.inventory
topics=inventory
topic.inventory.inventory.conditions.mapping=time=value.time,device_id=value.device_id,temperature=value.temperature,humidity=value.humidity 

It is running successfully on the terminal through the following command kafka/bin/connect-standalone.sh kafka/config/connect-standalone.properties /home/abc/inventory_db-connector.

My inventory.txt contains the data which i want to ingest in the database in the JSON format. Also I have checked on kafka-consumer for inventory topic and it shows that data is loaded:

{"time":"2022-02-08 16:18:00-05", "device_id":"weather-pro001000", "temperature":"200", "humidity":"200"}

Now we are trying to ingest the data into postgres using the following command cat inventory.txt | kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic demo --property "parse.key=true" --property "key.separator=:"; But now the problem is that data is not inserted in the database and the command does not shows any error and my log is also empty. I cannot seem to figure out this problem.


Solution

  • Debezium connector is a source connector i.e it is used to read data from postgresSQL into kafka.

    If you want to ingest data into postgres sql try using JDBC sink connector

    https://docs.confluent.io/kafka-connect-jdbc/current/sink-connector/index.html