Search code examples
postgresqlbashshellpipe

How to use logical decoding with pg_recvlogical to pass changes through a non-trivial pipeline, and then pipe it back to a database table?


I'm unable to pipe the output of PosgreSQL's pg_recvlogical tool through a processing command and then into a file. When I try to do this, the target file is empty and the logical decoding events are unread.

I tried this command after first creating the slot with the default output plugin.

pg_recvlogical -d postgres --slot test --start -f - | awk '{print}' | psql

After inserting some data using psql in another terminal window, and then terminating the command above, I expected sample.txt to have change events. Instead, I found it to be empty.

Weirdly, it does work when I substitute cat for awk '{print}'.


Solution

  • Here is a simple recipe for using logical decoding to capture change events using the wal2json output plugin, passing them through a non-trivial jq filter, and then storing them back in the database, in a change_event table which is not itself included in change capture. It may be adapted to suit other needs.

    psql -d postgres -c "create unlogged table change_event (payload jsonb)"
    pg_recvlogical -d postgres --slot=test --create-slot -P wal2json
    pg_recvlogical -d postgres -n --slot=test -o filter-tables=public.change_event --start -f - | jq --unbuffered -rc $'select(.change|length>0)|"insert into change_event (payload) values (\'\(.)\');"' | psql -d postgres &
    

    Here's how this works.

    1. The unlogged change_event table is an created with a jsonb data type since we're using wal2json.
    2. pg_recvlogical creates a slot with the wal2json output plugin.
    3. pg_recvlogical captures events withut restart (-n).
    4. An option to filter-tables helps avoid circular events involving the change_event table (note that this isn't strictly necessary since change_event is unlogged but it's an extra precaution).
    5. The data are emitted to stdout (-f -)
    6. They're passed through jq, crucially with the --unbuffered switch. The -r and -c switches are also useful here.
    7. The jq filter removes events with an empty change set (select(.change|length>0)).
    8. They're fed into a string template to generate a SQL insert statement.
    9. The output is piped to psql in order to write to the change_event table in the database.