I have successfully connect postgresql to kafka using debezium-connector-postgres-1.8.0.Final-plugin connector.Below is my Standalone.properties file:
rest.port=8084
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/home/abc/debezium-connector-postgres-1.8.0.Final-plugin/debezium-connector-postgres/debezium-connector-postgres-1.8.0.Final.jar,
############################# Zookeeper #############################
zookeeper.connect=localhost:2181
My connector file is given below:
name= inventory_db-connector
connector.class= io.debezium.connector.postgresql.PostgresConnector
tasks.max= 1
database.hostname= localhost
database.port= 5432
database.user= user
database.password= pass
database.dbname = testdb
database.server.name= dbserver1
database.whitelist= testdb
database.history.kafka.bootstrap.servers= kafka=9092
database.history.kafka.topic= schema-changes.inventory
topics=inventory
topic.inventory.inventory.conditions.mapping=time=value.time,device_id=value.device_id,temperature=value.temperature,humidity=value.humidity
It is running successfully on the terminal through the following command kafka/bin/connect-standalone.sh kafka/config/connect-standalone.properties /home/abc/inventory_db-connector
.
My inventory.txt contains the data which i want to ingest in the database in the JSON format. Also I have checked on kafka-consumer for inventory topic and it shows that data is loaded:
{"time":"2022-02-08 16:18:00-05", "device_id":"weather-pro001000", "temperature":"200", "humidity":"200"}
Now we are trying to ingest the data into postgres using the following command cat inventory.txt | kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic demo --property "parse.key=true" --property "key.separator=:";
But now the problem is that data is not inserted in the database and the command does
not shows any error and my log is also empty.
I cannot seem to figure out this problem.
Debezium connector is a source connector i.e it is used to read data from postgresSQL into kafka.
If you want to ingest data into postgres sql try using JDBC sink connector
https://docs.confluent.io/kafka-connect-jdbc/current/sink-connector/index.html