Recently I updated debezium version from 1.9.6.Final to 2.6.1.Final.
I changed database.server.name -> topic.prefix
as part of migration and created publications for pgoutput
plugin (previously wal2json
was used). Final config:
connector.class=io.debezium.connector.postgresql.PostgresConnector,
database.user=some_user,
database.dbname=locs,
offset.storage=com.quext.cdc.storage.CustomOffsetBackingStore,
slot.name=quext_core,
slot.drop.on.stop=true,
tasks.max=1,
schema.include.list=locs,debezium_cdc,
heartbeat.interval.ms=1800000,
plugin.name=pgoutput,
database.port=5432,
slot.max.retries=10,
slot.retry.delay.ms=60000,
topic.prefix=search-engine-server-locs,
heartbeat.action.query=INSERT INTO debezium_cdc.heartbeat (id, time) VALUES (1, NOW()) ON CONFLICT (id) DO UPDATE SET time = NOW();,
database.hostname=***,
database.password=***,
name=quext-core-connector,
table.include.list=debezium_cdc.heartbeat,locs.space,
engineName=core
We use Debezium as Java lib and host it in kube cluster. We also have several databases inside RDS instance so for every DB we have separate Debezium instance with unique slot.name
, name
and topic.prefix
. However publications have the same name in every DB (not sure if that matters). As an offset storage we use Postgresql table
Properties props = new Properties();
// ... set properties
engine = DebeziumEngine.create(Json.class)
.using(props)
.notifying(record -> {
this.handleEvent(record);
}).build();
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.execute(engine);
When I start debezium on DB without replication slot it creates the slot, logs the messages below and stucks
{time: 2024-04-17 18:15:37.849, level: DEBUG, PID: 1, thread: pool-2-thread-1, source: io.debezium.connector.postgresql.connection.PostgresConnection:309,trace: SearchEngineCDC,message: No replication slot 'quext_core' is present for plugin 'pgoutput' and database 'locs'}
{time: 2024-04-17 18:15:37.850, level: INFO, PID: 1, thread: pool-2-thread-1, source: io.debezium.connector.postgresql.connection.PostgresConnection:337,trace: SearchEngineCDC,message: Obtained valid replication slot ReplicationSlot [active=false, latestFlushedLsn=null, catalogXmin=null]}
{time: 2024-04-17 18:15:37.926, level: INFO, PID: 1, thread: pool-2-thread-1, source: io.debezium.connector.postgresql.PostgresConnectorTask:158,trace: SearchEngineCDC,message: Found previous offset PostgresOffsetContext [sourceInfoSchema=Schema{io.debezium.connector.postgresql.Source:STRUCT}, sourceInfo=source_info[server='search-engine-server-locs'db='locs', lsn=LSN{1ABC/8070CE88}, txId=2180314030, timestamp=2024-04-17T17:57:53.171784Z, snapshot=TRUE, schema=, table=], lastSnapshotRecord=false, lastCompletelyProcessedLsn=null, lastCommitLsn=null, streamingStoppingLsn=null, transactionContext=TransactionContext [currentTransactionId=null, perTableEventCount={}, totalEventCount=0], incrementalSnapshotContext=IncrementalSnapshotContext [windowOpened=false, chunkEndPosition=null, dataCollectionsToSnapshot=[], lastEventKeySent=null, maximumKey=null]]}
{time: 2024-04-17 18:15:38.026, level: WARN, PID: 1, thread: pool-2-thread-1, source: io.debezium.connector.postgresql.PostgresTaskContext:101,trace: SearchEngineCDC,message: Connector has enabled automated replication slot removal upon restart (slot.drop.on.stop = true). This setting is not recommended for production environments, as a new replication slot will be created after a connector restart, resulting in missed data change events.}
{time: 2024-04-17 18:15:38.033, level: DEBUG, PID: 1, thread: pool-2-thread-1, source: io.debezium.connector.postgresql.connection.PostgresReplicationConnection:507,trace: SearchEngineCDC,message: Creating new replication slot 'quext_core' for plugin 'PGOUTPUT'}
{time: 2024-04-17 18:15:38.132, level: INFO, PID: 1, thread: pool-2-thread-1, source: io.debezium.connector.postgresql.connection.PostgresReplicationConnection:150,trace: SearchEngineCDC,message: Initializing PgOutput logical decoder publication}
{time: 2024-04-17 18:15:38.138, level: INFO, PID: 1, thread: pool-2-thread-1, source: io.debezium.connector.postgresql.connection.PostgresReplicationConnection:529,trace: SearchEngineCDC,message: Creating replication slot with command CREATE_REPLICATION_SLOT "quext_core" LOGICAL pgoutput}
When I stop Debeizum server, postgresql replication slot remains active for some reason which leads to the next issue: when I restart Debezium server, it throws an exception that replication already exists.
If I kill pid which occupies replication slot by pg_terminate_backend(pid)
, I end up in the first case when Debezium stucks on replication slot creation
Example of pg_replication_slots
select
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) as replicationSlotLag,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)) as confirmedLag,
*
from
pg_replication_slots
order by
slot_name;
{
"replicationslotlag": "617 MB",
"confirmedlag": null,
"slot_name": "quext_core",
"plugin": "pgoutput",
"slot_type": "logical",
"datoid": 38711948,
"database": "locs",
"temporary": false,
"active": true,
"active_pid": 17073,
"xmin": null,
"catalog_xmin": "2178725846",
"restart_lsn": "1ABC/648AED10",
"confirmed_flush_lsn": null
}
Offset table data
[
{
"id" : "fed74584-10b2-4c6c-85ef-9b4b87f208d7",
"engine_name" : "search_engine",
"offset_key" : "[\"quext-search-engine-connector\",{\"server\":\"search-engine-server-search_engine\"}]",
"offset_payload" : "{\"last_snapshot_record\":false,\"lsn\":29396158091376,\"txId\":2180193567,\"ts_usec\":1713374167445940,\"snapshot\":true}"
},
{
"id" : "ee9a8ca0-be9c-4413-9c1d-869c0da0ae00",
"engine_name" : "core",
"offset_key" : "[\"quext-core-connector\",{\"server\":\"search-engine-server-locs\"}]",
"offset_payload" : "{\"last_snapshot_record\":false,\"lsn\":29397573471232,\"txId\":2180419522,\"ts_usec\":1713378721496195,\"snapshot\":true}"
}
]
Some configuration info
Postgresql - RDS (PostgreSQL 12.17 on aarch64-unknown-linux-gnu, compiled by gcc (GCC) 7.3.1 20180712 (Red Hat 7.3.1-6), 64-bit)
Debezium version - 2.6.1.Final
Plugin - pgoutput
Postgres instance is configured for replication and roles are assigned
Publications created for every database using CREATE PUBLICATION dbz_publication FOR ALL TABLES;
I tried
Restart Debezium Server (Not sure it's graceful shutdown from Debezium client perspective)
Kill replication slot active_pid by pg_terminate_backend(pid) and start Debezium
Clear offset table.
I swear it was working in the beginning of the day but when I enabled several Debezium instances for other DBs (We have a single instance with several DBs inside and for every DB we start Debezium instance) it stopped working. And now it is not working even with a single Debezium instance.
May be snapshot is an issue? But the DB is 12 MB size
UPD1: I rolled back Debezium version and now it logging
{time: 2024-04-18 13:10:22.995, level: WARN, PID: 1, thread: pool-5-thread-1, source: io.debezium.connector.postgresql.connection.PostgresConnection:265,trace: SearchEngineCDC,message: Cannot obtain valid replication slot 'quext_accounting' for plugin 'wal2json' and database 'acct_1' [during attempt 43 out of 900, concurrent tx probably blocks taking snapshot.}
Today we figured out what the issue was: it was on the DB side. Replication slot creation process was blocked by a long running transaction: the slot was shown as created with active pid but did not proceed further. After we found that out using pg_stat_activity
and killed blocking process replication slot creation finished and now it's working as expected.
For those who might face that issue in the future, try to create a replication slot manually using pg_create_logical_replication_slot(...)
. If it does not work even without Debezium - most likely something wrong on DB side. Our second step was to check if the replication slot process is blocked.
select
pid,
usename,
pg_blocking_pids(pid) as blocked_by,
query as blocked_query
from
pg_stat_activity
where
cardinality(pg_blocking_pids(pid)) > 0
-- and pid = '...' you can filter by active_pid from pg_replication_slots view
After investigation, we found a single transaction that runs for almost a day and blocks other transactions, which block other transactions, ..., replication slot creation.
Unfortunately it's not possible to set a timeout for active and running transactions as far as I know. But it's possible to set a single statement execution timeout and idle transaction timeout (our transaction wasn't idle). Also you can monitor long running transactions using the following query
select
clock_timestamp() - xact_start,
*
from
pg_stat_activity
where
clock_timestamp() - xact_start > interval '1 minute' -- specify interval