Let's say we have two microservices: service_A
and service_B
.
Each one has its own database (db_a
and db_b
respectively) in a single Postgres server instance (This is just a staging environment, so we don't have a cluster).
There is also another service, service_debezium
(with an Embedded Debezium v1.6.1Final) that should be listening for changes in db_a
and db_b
. So basically there are two Debezium engines configured in this service.
But somehow service_debezium
cannot listen for db_a
and db_b
at the same time. It only listens for one of them for some reason and there are no error logs.
Additionally, if I configure service_debezium
(i.e. its Debezium engine) to listen for either db_a
or db_b
, it works just as expected so I'm certain their configuration properties are correct, and (when there is only one engine) everything is working.
database.dbname
in its configuration so I guess the preferred way is to define a new Debezium engine for each database. Is that correct?Here are the Debezium configurations in service_debezium
:
db_a
config bean: @Bean
public io.debezium.config.Configuration dbAConnector() throws IOException {
File offsetStorageTempFile = File.createTempFile("offsets_", ".dat");
File dbHistoryTempFile = File.createTempFile("dbhistory_", ".dat");
return io.debezium.config.Configuration.create()
.with("name", "db_a_connector")
.with("connector.class", "io.debezium.connector.postgresql.PostgresConnector")
.with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
.with("offset.storage.file.filename", offsetStorageTempFile.getAbsolutePath())
.with("offset.flush.interval.ms", "60000")
.with("database.hostname", "localhost")
.with("database.port", 5432)
.with("database.user", "postgres")
.with("database.password", "*****")
.with("database.dbname", "db_a")
.with("table.whitelist", "public.dummy_table,public.another_dummy_table")
.with("plugin.name", "pgoutput")
.with("slot.name", "db_a_connector")
.with("database.server.name", "db_a_server")
.with("database.history", "io.debezium.relational.history.FileDatabaseHistory")
.with("database.history.file.filename", dbHistoryTempFile.getAbsolutePath())
.build();
}
db_b
config bean: @Bean
public io.debezium.config.Configuration dbBConnector() throws IOException {
File offsetStorageTempFile = File.createTempFile("offsets_", ".dat");
File dbHistoryTempFile = File.createTempFile("dbhistory_", ".dat");
return io.debezium.config.Configuration.create()
.with("name", "db_b_connector")
.with("connector.class", "io.debezium.connector.postgresql.PostgresConnector")
.with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
.with("offset.storage.file.filename", offsetStorageTempFile.getAbsolutePath())
.with("offset.flush.interval.ms", "60000")
.with("database.hostname", localhost)
.with("database.port", 5432)
.with("database.user", "postgres")
.with("database.password", "*****")
.with("database.dbname", "db_b")
.with("table.whitelist", "public.yet_another_table")
.with("plugin.name", "pgoutput")
.with("slot.name", "db_b_connector")
.with("database.server.name", "db_b_server")
.with("database.history", "io.debezium.relational.history.FileDatabaseHistory")
.with("database.history.file.filename", dbHistoryTempFile.getAbsolutePath())
.build();
}
Here are how I create Debezium engine instances in service_debezium
:
db_a
listener:@Component
public class DBAListener {
public DBAListener(
@Qualifier("dbAConnector") Configuration connectorConfiguration /*, ... other services */) {
this.debeziumEngine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class))
.using(connectorConfiguration.asProperties())
.notifying(this::handleChangeEvent)
.build();
// ...
}
// ...
}
db_b
listener:@Component
public class DBBListener {
public DBBListener(
@Qualifier("dbBConnector") Configuration connectorConfiguration /*, ... other services */) {
this.debeziumEngine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class))
.using(connectorConfiguration.asProperties())
.notifying(this::handleChangeEvent)
.build();
// ...
}
// ...
}
postgresql.conf
:# ...other conf
wal_level = logical # minimal, archive, hot_standby, or logical (change requires restart)
# max number of walsender processes (i.e. # of replica databases + # of Debezium engines) (change requires restart)
# Abrupt streaming client disconnection might cause an orphaned connection slot until a timeout is reached,
# so this parameter should be set slightly higher than the maximum number of expected clients
# so disconnected clients can immediately reconnect
max_wal_senders = 3
max_replication_slots = 3 # max number of replication slots (change requires restart)
Thanks in advance!
UPDATE: Updated the code snippets to give a full example according to @Yuri Niitsuma's answer.
When you create a debezium connector, it create a replication slot with the default name "debezium". Then you try to create another instance and try to create a replication slot with the same name and cannot use two instances at the same time using the same replication slot, that will throw a error. This is the poor explanation, but I'll give the solution.
Add on each connector this configuration:
On dbAConnector
.with("slot.name", "dbAConnector")
And dbBConnector
.with("slot.name", "dbBConnector")
You can list the available replication slots with:
SELECT * FROM pg_replication_slots;
And you can delete unused replication slot, like the default name "debezium" with:
SELECT pg_drop_replication_slot('debezium');
because will be consuming disk when no one will consume this slot.