Search code examples

Debezium source task fails to reconnect to postgresql DB when DB container is re-created

We have a kubernetes cluster with Debezium running as a source task from a Postgresql and writing to kafka. Debezium, postgres and kafka are all running in separate pods. When the postgres pod is deleted and kubernetes re-creates the pod, debezium pod fails to re-connect. Logs from debezium pod:

    2018-07-17 08:31:38,311 ERROR  ||  WorkerSourceTask{id=inventory-connector-0} Task is being killed and will not recover until manually restarted   [org.apache.kafka.connect.runtime.WorkerTask]
    2018-07-17 08:31:38,311 INFO   ||  [Producer clientId=producer-4] Closing the Kafka producer with timeoutMillis = 30000 ms.   [org.apache.kafka.clients.producer.KafkaProducer]

Debezium continues to try to flush outstanding messages at intervals, but gives the following exception:

    2018-07-17 08:32:38,167 ERROR  ||  WorkerSourceTask{id=inventory-connector-0} Exception thrown while calling task.commit()   [org.apache.kafka.connect.runtime.WorkerSourceTask]
    org.apache.kafka.connect.errors.ConnectException: org.postgresql.util.PSQLException: Database connection failed when writing to copy
    at io.debezium.connector.postgresql.RecordsStreamProducer.commit(
    at io.debezium.connector.postgresql.PostgresConnectorTask.commit(
    at org.apache.kafka.connect.runtime.WorkerSourceTask.commitSourceTask(
    at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(
    at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.commit(
    at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.access$000(
    at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter$
    at java.util.concurrent.Executors$
    at java.util.concurrent.FutureTask.runAndReset(
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(
    at java.util.concurrent.ScheduledThreadPoolExecutor$
    at java.util.concurrent.ThreadPoolExecutor.runWorker(
    at java.util.concurrent.ThreadPoolExecutor$
    Caused by: org.postgresql.util.PSQLException: Database connection failed when writing to copy
    at org.postgresql.core.v3.QueryExecutorImpl.flushCopy(
    at org.postgresql.core.v3.CopyDualImpl.flushCopy(
    at org.postgresql.core.v3.replication.V3PGReplicationStream.updateStatusInternal(
    at org.postgresql.core.v3.replication.V3PGReplicationStream.forceUpdateStatus(
    at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.doFlushLsn(
    at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.flushLsn(
    at io.debezium.connector.postgresql.RecordsStreamProducer.commit(
    ... 13 more
    Caused by: Broken pipe (Write failed)
    at Method)
    at org.postgresql.core.PGStream.flush(
    at org.postgresql.core.v3.QueryExecutorImpl.flushCopy(
    ... 19 more

Is there a way to have debezium re-establish its connection to postgres when it becomes available? Or am I missing some config?

  • Debezium version 0.8
  • kubernetes version 1.10.3
  • postgres version 9.6


  • Looks like this is a common issue and has open feature requests in both debezium and kafka

    While these are open, it looks like this is expected behaviour

    As a workaround I've add this liveness probe to the deployment

              - sh
              - -ec
              - ipaddress=$(ip addr | grep 'state UP' -A2 | tail -n1 | awk '{print $2}' | cut -f1  -d'/'); reply=$(curl -s $ipaddress:8083/connectors/inventory-connector/status | grep -o RUNNING | wc -l); if [ $reply -lt 2 ]; then exit 1; fi;
            initialDelaySeconds: 30
            periodSeconds: 5

    First clause gets the container IP address:

        ipaddress=$(ip addr | grep 'state UP' -A2 | tail -n1 | awk '{print $2}' | cut -f1 -d'/');

    Second clause makes the request and counts instances of 'RUNNING' in the response json:

        reply=$(curl -s $ipaddress:8083/connectors/inventory-connector/status | grep -o RUNNING | wc -l);

    Third clause returns exit code 1 if 'RUNNING' appears less than twice

        if [ $reply -lt 2 ]; then exit 1; fi

    It seems to be working on initial tests - i.e. restarting the postgres DB triggers a restart of the debezium container. I guess a script something like this (although perhaps 'robustified') could be included in the image to facilitate the probe.