Search code examples
postgresqlamazon-rdsamazon-aurora

Logical Replication for Aurora Postgres in Java


My goal is to stream the Postgres WAL inside a system that runs on the JVM. I am using Aurora RDS Postgres.

If I try to run the following:

    PGReplicationStream stream =
        pgConnection.getReplicationAPI()
            .replicationStream()
            .logical()
            .withSlotName("test_slot")
            .withStartPosition(lsn)
            .start();

I get the error:

; (err) ERROR: syntax error at or near "START_REPLICATION"
; (err)   Position: 1

Postgres logs the following command:

:ERROR:  syntax error at or near "START_REPLICATION" at character 1
:STATEMENT:  START_REPLICATION SLOT test_slot LOGICAL 0/40DCC70;

This does look like the command the jdbc driver is trying to write. (as seen here)

START_REPLICATION does seem to be a command Postgres supports. Is it that Aurora RDS does not support this? If so, is there an alternative way to do this?


For completeness, here's the code I've built up so far. I am using Clojure, but most of this is calling directly into java functions.

  ; pool is HikariCP
  (def pg-conn (-> pool
                   (.getConnection)
                   (.unwrap org.postgresql.PGConnection)))

  ;; create slot
  (sql/execute! pool ["SELECT pg_create_logical_replication_slot('instant_server_stopa', 'wal2json');"])

  ;; log sequence number
  (def lsn (org.postgresql.replication.LogSequenceNumber/valueOf
            (:pg_current_wal_lsn (sql/select-one pg-conn
                                                 ["SELECT pg_current_wal_lsn()"]))))
  (def s (-> pg-conn
             (.getReplicationAPI)
             (.replicationStream)
             (.logical)
             (.withSlotName "instant_server_stopa")
             (.withStartPosition lsn)
             (.start)))

Solution

  • The problem was that I was using the wrong connection. I had a HikariCP, and thought I could just take one of the connections from it.

    But what you need to do, is to create a specific 'replication' connection. Something like:

        Properties props = new Properties();
        PGProperty.USER.set(props, "user");
        PGProperty.PASSWORD.set(props, "pass");
        PGProperty.ASSUME_MIN_SERVER_VERSION.set(props, "9.4");
        PGProperty.REPLICATION.set(props, "database");
        PGProperty.PREFER_QUERY_MODE.set(props, "simple");
    
        Connection con = DriverManager.getConnection(url, props);