Search code examples
postgresqlgoogle-cloud-platformgoogle-cloud-sqldebezium

Debezium with GCP Cloudsql postgresql


Google finally added the support for Cloudsql postgresql logical replication/decoding support. What I am trying to do is to use Debezium to capture the changes on the database tables, and then publish the changes in json to Gcp pubsub. I am using Debezium server with Gcp Pubsub Sink just to verify the connection is working, but keep getting following error message org.postgresql.util.PSQLException: ERROR: could not access file "decoderbufs": No such file or directory

So seems like it defaults the plugin to decoderbufs, however I did set the database.source.plugin.name in the environment variable. According to this doc https://hub.docker.com/r/debezium/server, I am using DATABASE_SOURCE_PLUGIN_NAME. Here is the snippet of my k8s manifest

      serviceAccountName: cloudsql-client
      containers:
      - name: debezium-standalone-server-dv
        image: debezium/server:1.6
        env:
          - name: DEBEZIUM_SOURCE_CONNECTOR_CLASS
            value: io.debezium.connector.postgresql.PostgresConnector
          - name: DEBEZIUM_SOURCE_DATABASE_HOSTNAME
            value: localhost
          - name: DEBEZIUM_SOURCE_DATABASE_PORT
            value: "5432"
          - name: DEBEZIUM_SOURCE_DATABASE_USER
            value: cdc
          - name: DEBEZIUM_SOURCE_DATABASE_PASSWORD
            value: cdc
          - name: DEBEZIUM_SOURCE_DATABASE_DBNAME
            value: postgres
          - name: DEBEZIUM_SINK_TYPE
            value: pubsub
          - name: DEBEZIUM_SINK_PUBSUB_PROJECT_ID
            value: myproject
          - name: DEBEZIUM_SOURCE_OFFSET_STORAGE_FILE_FILENAME
            value: data/offsets.dat
          - name: DEBEZIUM_SOURCE_OFFSET_FLUSH_INTERVAL_MS
            value: "0"
          - name: DEBEZIUM_SOURCE_DATABASE_SERVER_NAME
            value: CDC_POC
          - name: DATABASE_SOURCE_PLUGIN_NAME
            value: wal2json

From Cloudsql, i have already turn on the logical decoding and add the REPLICATION attribute to the user, and i am able to "manually" create it by running SELECT * FROM pg_create_logical_replication_slot('test_slot', 'wal2json'); I can also see the changes by querying the replication slot SELECT * FROM pg_logical_slot_get_changes('test_slot', NULL, NULL);

so what am i missing here?


Solution

  • I am guessing it is a typo on the docker configuration documentation. After i changed the configuration from DATABASE_SOURCE_PLUGIN_NAME to DEBEZIUM_SOURCE_PLUGIN_NAME it chooses the plugin correctly