Search code examples
javapostgresqlspring-bootdebezium

How to use Embedded Debezium for multiple databases in a single Postgres server?


Let's say we have two microservices: service_A and service_B.
Each one has its own database (db_a and db_b respectively) in a single Postgres server instance (This is just a staging environment, so we don't have a cluster).

There is also another service, service_debezium (with an Embedded Debezium v1.6.1Final) that should be listening for changes in db_a and db_b. So basically there are two Debezium engines configured in this service.

But somehow service_debezium cannot listen for db_a and db_b at the same time. It only listens for one of them for some reason and there are no error logs.

Additionally, if I configure service_debezium (i.e. its Debezium engine) to listen for either db_a or db_b, it works just as expected so I'm certain their configuration properties are correct, and (when there is only one engine) everything is working.

  1. So why can't we use multiple Debezium engines to listen for multiple databases in a single Postgres server? What am I missing here?
  2. Another alternative I thought is to use just one Debezium engine that listens for all databases in that Postgres server instance but apparently it requires database.dbname in its configuration so I guess the preferred way is to define a new Debezium engine for each database. Is that correct?

Here are the Debezium configurations in service_debezium:

  • db_a config bean:
  @Bean
  public io.debezium.config.Configuration dbAConnector() throws IOException {
    File offsetStorageTempFile = File.createTempFile("offsets_", ".dat");
    File dbHistoryTempFile = File.createTempFile("dbhistory_", ".dat");
    return io.debezium.config.Configuration.create()
        .with("name", "db_a_connector")
        .with("connector.class", "io.debezium.connector.postgresql.PostgresConnector")
        .with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
        .with("offset.storage.file.filename", offsetStorageTempFile.getAbsolutePath())
        .with("offset.flush.interval.ms", "60000")
        .with("database.hostname", "localhost")
        .with("database.port", 5432)
        .with("database.user", "postgres")
        .with("database.password", "*****")
        .with("database.dbname", "db_a")
        .with("table.whitelist", "public.dummy_table,public.another_dummy_table")
        .with("plugin.name", "pgoutput")
        .with("slot.name", "db_a_connector")
        .with("database.server.name", "db_a_server")
        .with("database.history", "io.debezium.relational.history.FileDatabaseHistory")
        .with("database.history.file.filename", dbHistoryTempFile.getAbsolutePath())
        .build();
  }
  • db_b config bean:
  @Bean
  public io.debezium.config.Configuration dbBConnector() throws IOException {
    File offsetStorageTempFile = File.createTempFile("offsets_", ".dat");
    File dbHistoryTempFile = File.createTempFile("dbhistory_", ".dat");
    return io.debezium.config.Configuration.create()
        .with("name", "db_b_connector")
        .with("connector.class", "io.debezium.connector.postgresql.PostgresConnector")
        .with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
        .with("offset.storage.file.filename", offsetStorageTempFile.getAbsolutePath())
        .with("offset.flush.interval.ms", "60000")
        .with("database.hostname", localhost)
        .with("database.port", 5432)
        .with("database.user", "postgres")
        .with("database.password", "*****")
        .with("database.dbname", "db_b")
        .with("table.whitelist", "public.yet_another_table")
        .with("plugin.name", "pgoutput")
        .with("slot.name", "db_b_connector")
        .with("database.server.name", "db_b_server")
        .with("database.history", "io.debezium.relational.history.FileDatabaseHistory")
        .with("database.history.file.filename", dbHistoryTempFile.getAbsolutePath())
        .build();
  }

Here are how I create Debezium engine instances in service_debezium:

  • db_a listener:
@Component
public class DBAListener {

  public DBAListener(
      @Qualifier("dbAConnector") Configuration connectorConfiguration /*, ... other services */) {

    this.debeziumEngine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class))
        .using(connectorConfiguration.asProperties())
        .notifying(this::handleChangeEvent)
        .build();

    // ...
  }

  // ...

}
  • and similarly db_b listener:
@Component
public class DBBListener {

  public DBBListener(
      @Qualifier("dbBConnector") Configuration connectorConfiguration /*, ... other services */) {

    this.debeziumEngine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class))
        .using(connectorConfiguration.asProperties())
        .notifying(this::handleChangeEvent)
        .build();

    // ...
  }

  // ...

}
  • postgresql.conf:
# ...other conf

wal_level = logical             # minimal, archive, hot_standby, or logical (change requires restart)
# max number of walsender processes (i.e. # of replica databases + # of Debezium engines) (change requires restart)
# Abrupt streaming client disconnection might cause an orphaned connection slot until a timeout is reached, 
# so this parameter should be set slightly higher than the maximum number of expected clients 
# so disconnected clients can immediately reconnect
max_wal_senders = 3
max_replication_slots = 3       # max number of replication slots (change requires restart)

Thanks in advance!

UPDATE: Updated the code snippets to give a full example according to @Yuri Niitsuma's answer.


Solution

  • When you create a debezium connector, it create a replication slot with the default name "debezium". Then you try to create another instance and try to create a replication slot with the same name and cannot use two instances at the same time using the same replication slot, that will throw a error. This is the poor explanation, but I'll give the solution.

    Add on each connector this configuration:

    On dbAConnector

    .with("slot.name", "dbAConnector")
    

    And dbBConnector

    .with("slot.name", "dbBConnector")
    

    You can list the available replication slots with:

    SELECT * FROM pg_replication_slots;
    

    And you can delete unused replication slot, like the default name "debezium" with:

    SELECT pg_drop_replication_slot('debezium');
    

    because will be consuming disk when no one will consume this slot.