Search code examples
postgresqldebeziumdebezium-engine

Debezium with Postgresql RDS stuck at Creating replication slot with command CREATE_REPLICATION_SLOT


Recently I updated debezium version from 1.9.6.Final to 2.6.1.Final.

I changed database.server.name -> topic.prefix as part of migration and created publications for pgoutput plugin (previously wal2json was used). Final config:

connector.class=io.debezium.connector.postgresql.PostgresConnector,
database.user=some_user,
database.dbname=locs,
offset.storage=com.quext.cdc.storage.CustomOffsetBackingStore,
slot.name=quext_core,
slot.drop.on.stop=true,
tasks.max=1,
schema.include.list=locs,debezium_cdc,
heartbeat.interval.ms=1800000,
plugin.name=pgoutput,
database.port=5432,
slot.max.retries=10,
slot.retry.delay.ms=60000,
topic.prefix=search-engine-server-locs,
heartbeat.action.query=INSERT INTO debezium_cdc.heartbeat (id, time) VALUES (1, NOW()) ON CONFLICT (id) DO UPDATE SET time = NOW();,
database.hostname=***,
database.password=***,
name=quext-core-connector,
table.include.list=debezium_cdc.heartbeat,locs.space,
engineName=core

We use Debezium as Java lib and host it in kube cluster. We also have several databases inside RDS instance so for every DB we have separate Debezium instance with unique slot.name, name and topic.prefix. However publications have the same name in every DB (not sure if that matters). As an offset storage we use Postgresql table

Properties props = new Properties();

// ... set properties

engine = DebeziumEngine.create(Json.class)
    .using(props)
    .notifying(record -> {
                this.handleEvent(record);
    }).build();

ExecutorService executor = Executors.newSingleThreadExecutor();
executor.execute(engine);

When I start debezium on DB without replication slot it creates the slot, logs the messages below and stucks

{time: 2024-04-17 18:15:37.849, level: DEBUG, PID: 1, thread: pool-2-thread-1, source: io.debezium.connector.postgresql.connection.PostgresConnection:309,trace: SearchEngineCDC,message: No replication slot 'quext_core' is present for plugin 'pgoutput' and database 'locs'}
{time: 2024-04-17 18:15:37.850, level: INFO, PID: 1, thread: pool-2-thread-1, source: io.debezium.connector.postgresql.connection.PostgresConnection:337,trace: SearchEngineCDC,message: Obtained valid replication slot ReplicationSlot [active=false, latestFlushedLsn=null, catalogXmin=null]}
{time: 2024-04-17 18:15:37.926, level: INFO, PID: 1, thread: pool-2-thread-1, source: io.debezium.connector.postgresql.PostgresConnectorTask:158,trace: SearchEngineCDC,message: Found previous offset PostgresOffsetContext [sourceInfoSchema=Schema{io.debezium.connector.postgresql.Source:STRUCT}, sourceInfo=source_info[server='search-engine-server-locs'db='locs', lsn=LSN{1ABC/8070CE88}, txId=2180314030, timestamp=2024-04-17T17:57:53.171784Z, snapshot=TRUE, schema=, table=], lastSnapshotRecord=false, lastCompletelyProcessedLsn=null, lastCommitLsn=null, streamingStoppingLsn=null, transactionContext=TransactionContext [currentTransactionId=null, perTableEventCount={}, totalEventCount=0], incrementalSnapshotContext=IncrementalSnapshotContext [windowOpened=false, chunkEndPosition=null, dataCollectionsToSnapshot=[], lastEventKeySent=null, maximumKey=null]]}
{time: 2024-04-17 18:15:38.026, level: WARN, PID: 1, thread: pool-2-thread-1, source: io.debezium.connector.postgresql.PostgresTaskContext:101,trace: SearchEngineCDC,message: Connector has enabled automated replication slot removal upon restart (slot.drop.on.stop = true). This setting is not recommended for production environments, as a new replication slot will be created after a connector restart, resulting in missed data change events.}
{time: 2024-04-17 18:15:38.033, level: DEBUG, PID: 1, thread: pool-2-thread-1, source: io.debezium.connector.postgresql.connection.PostgresReplicationConnection:507,trace: SearchEngineCDC,message: Creating new replication slot 'quext_core' for plugin 'PGOUTPUT'}
{time: 2024-04-17 18:15:38.132, level: INFO, PID: 1, thread: pool-2-thread-1, source: io.debezium.connector.postgresql.connection.PostgresReplicationConnection:150,trace: SearchEngineCDC,message: Initializing PgOutput logical decoder publication}
{time: 2024-04-17 18:15:38.138, level: INFO, PID: 1, thread: pool-2-thread-1, source: io.debezium.connector.postgresql.connection.PostgresReplicationConnection:529,trace: SearchEngineCDC,message: Creating replication slot with command CREATE_REPLICATION_SLOT "quext_core"  LOGICAL pgoutput}

When I stop Debeizum server, postgresql replication slot remains active for some reason which leads to the next issue: when I restart Debezium server, it throws an exception that replication already exists.

If I kill pid which occupies replication slot by pg_terminate_backend(pid) , I end up in the first case when Debezium stucks on replication slot creation

Example of pg_replication_slots

select
    pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) as replicationSlotLag,
    pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)) as confirmedLag,
    *
from
    pg_replication_slots
order by
    slot_name;

{
  "replicationslotlag": "617 MB",
  "confirmedlag": null,
  "slot_name": "quext_core",
  "plugin": "pgoutput",
  "slot_type": "logical",
  "datoid": 38711948,
  "database": "locs",
  "temporary": false,
  "active": true,
  "active_pid": 17073,
  "xmin": null,
  "catalog_xmin": "2178725846",
  "restart_lsn": "1ABC/648AED10",
  "confirmed_flush_lsn": null
}

Offset table data

[
    {
        "id" : "fed74584-10b2-4c6c-85ef-9b4b87f208d7",
        "engine_name" : "search_engine",
        "offset_key" : "[\"quext-search-engine-connector\",{\"server\":\"search-engine-server-search_engine\"}]",
        "offset_payload" : "{\"last_snapshot_record\":false,\"lsn\":29396158091376,\"txId\":2180193567,\"ts_usec\":1713374167445940,\"snapshot\":true}"
    },
    {
        "id" : "ee9a8ca0-be9c-4413-9c1d-869c0da0ae00",
        "engine_name" : "core",
        "offset_key" : "[\"quext-core-connector\",{\"server\":\"search-engine-server-locs\"}]",
        "offset_payload" : "{\"last_snapshot_record\":false,\"lsn\":29397573471232,\"txId\":2180419522,\"ts_usec\":1713378721496195,\"snapshot\":true}"
    }
]

Some configuration info

  • Postgresql - RDS (PostgreSQL 12.17 on aarch64-unknown-linux-gnu, compiled by gcc (GCC) 7.3.1 20180712 (Red Hat 7.3.1-6), 64-bit)

  • Debezium version - 2.6.1.Final

  • Plugin - pgoutput

  • Postgres instance is configured for replication and roles are assigned

  • Publications created for every database using CREATE PUBLICATION dbz_publication FOR ALL TABLES;

I tried

  • Restart Debezium Server (Not sure it's graceful shutdown from Debezium client perspective)

  • Kill replication slot active_pid by pg_terminate_backend(pid) and start Debezium

  • Clear offset table.

I swear it was working in the beginning of the day but when I enabled several Debezium instances for other DBs (We have a single instance with several DBs inside and for every DB we start Debezium instance) it stopped working. And now it is not working even with a single Debezium instance.

May be snapshot is an issue? But the DB is 12 MB size

UPD1: I rolled back Debezium version and now it logging

{time: 2024-04-18 13:10:22.995, level: WARN, PID: 1, thread: pool-5-thread-1, source: io.debezium.connector.postgresql.connection.PostgresConnection:265,trace: SearchEngineCDC,message: Cannot obtain valid replication slot 'quext_accounting' for plugin 'wal2json' and database 'acct_1' [during attempt 43 out of 900, concurrent tx probably blocks taking snapshot.}

Solution

  • Today we figured out what the issue was: it was on the DB side. Replication slot creation process was blocked by a long running transaction: the slot was shown as created with active pid but did not proceed further. After we found that out using pg_stat_activity and killed blocking process replication slot creation finished and now it's working as expected.

    For those who might face that issue in the future, try to create a replication slot manually using pg_create_logical_replication_slot(...). If it does not work even without Debezium - most likely something wrong on DB side. Our second step was to check if the replication slot process is blocked.

    select
           pid,
           usename,
           pg_blocking_pids(pid) as blocked_by,
           query as blocked_query
    from
          pg_stat_activity
    where
          cardinality(pg_blocking_pids(pid)) > 0
          -- and pid = '...' you can filter by active_pid from pg_replication_slots view
    

    After investigation, we found a single transaction that runs for almost a day and blocks other transactions, which block other transactions, ..., replication slot creation.

    Unfortunately it's not possible to set a timeout for active and running transactions as far as I know. But it's possible to set a single statement execution timeout and idle transaction timeout (our transaction wasn't idle). Also you can monitor long running transactions using the following query

    select
          clock_timestamp() - xact_start,
           *
    from
          pg_stat_activity
    where
          clock_timestamp() - xact_start > interval '1 minute' -- specify interval