Search code examples
postgresqlnpgsqllogical-replication

Ngpsql logical replication. Get last confirmed LSN


I use Npgsql LogicalReplicationConnection to consume change data events from PostgreSQL.

It's something like this:

await foreach (var message in replicationConnection
                           .StartReplication(replicationSlot, replicationPublication, cancellationToken)) { ... }

StartReplication method has a parameter NpgsqlLogSequenceNumber? walLocation = null

If it has default value every time afer my application starts replication stared from scratch. My application is the only consumer of replication slot change data events so it would be nice to start from last confirmed LSN.

Is there any way to get last confirmed LSN?


Solution

  • Is there any way to get last confirmed LSN?

    You can just query PostgreSQL's system view pg_replication_slots where the confirmed_flush_lsn column is what you want.

    Mind you though, that this probably isn't a good idea, depending on what the requirements of your application are and that in most cases you should keep track of the last LSN you have successfully processed in your application on your side.

    The following is a more or less complete example on how I'd approach this if occasinally processing a message that has already been processed isn't a problem.

    using Npgsql;
    using Npgsql.Replication;
    using Npgsql.Replication.PgOutput;
    using NpgsqlTypes;
    
    const string replicationSlot = "my_slot";
    const string replicationPublication = "my_publication";
    
    var cs = new NpgsqlConnectionStringBuilder()
    {
        Host = "localhost",
        Username = "postgres",
        Password = "some_password",
        Database = "my_db"
    }.ConnectionString;
    
    var exit = new ManualResetEventSlim(false);
    using var cts = new CancellationTokenSource();
    try
    {
        var cancellationToken = cts.Token;
        Console.CancelKeyPress += (_, e) =>
        {
            cts.Cancel();
            exit.Wait();
        };
    
        await using var ds = new NpgsqlSlimDataSourceBuilder(cs).Build();
        var startLsn = NpgsqlLogSequenceNumber.Invalid;
    
        // This is just an example and not a good practice in most cases, because your
        // application may have processed a message but the backend may not have received
        // or processed the feedback message where your application confirmed it before
        // things (connection, your application, the backend) broke.
        // This may result in your application processing messages twice after restarting
        // which probably is not what you want.
        // Typically, you'd maintain a database table ore some list with the LSNs of
        // successfully processed messages in your application to which you add in a
        // transactional way.
        await using(var cmd = ds.CreateCommand(
            "SELECT confirmed_flush_lsn FROM pg_replication_slots WHERE slot_name = $1"))
        {
            cmd.Parameters.AddWithValue(replicationSlot);
    
            await using (var reader = await cmd.ExecuteReaderAsync(cancellationToken))
                if (await reader.ReadAsync())
                    startLsn = reader.GetFieldValue<NpgsqlLogSequenceNumber>(0);
        }
    
        await using var replConn = new LogicalReplicationConnection(cs);
        await replConn.Open(cancellationToken);
        PgOutputReplicationSlot slot;
        if (startLsn == NpgsqlLogSequenceNumber.Invalid)
        {
            slot = await replConn.CreatePgOutputReplicationSlot(replicationSlot);
            // You may want to do something about the existing data using the exported snapshot
            // in a transaction here.
            // Like the following:
            // await using var conn = await ds.OpenConnectionAsync(cancellationToken);
            // await using var tran = await conn.BeginTransactionAsync(
            //     IsolationLevel.RepeatableRead, cancellationToken);
            // await using var cmd = new NpgsqlCommand(
            //     "SET TRANSACTION SNAPSHOT '" + slot.SnapshotName + "'", conn, tran);
            // await using var exporter = await conn.BeginBinaryExportAsync(
            //     "COPY my_table (id, message) TO STDOUT (FORMAT BINARY)",
            //     cancellationToken);
            // while (await exporter.StartRowAsync(cancellationToken) >= 0)
            // {
            //     var id = await exporter.ReadAsync<int>(cancellationToken);
            //     if (exporter.IsNull)
            //         await exporter.SkipAsync(cancellationToken);
            //     else
            //         var message = await exporter.ReadAsync<string>(cancellationToken);
            // }
        }
        else
            slot = new(new ReplicationSlotOptions(replicationSlot, startLsn));
    
        Console.WriteLine("Starting replication. Press Ctrl+C to exit...");
        await foreach (var message in replConn.StartReplication(
                           slot, new(replicationPublication, 1), cancellationToken))
        {
            // Do what you need to do with the message here
            Console.WriteLine(
                $"Received message type: {message.GetType().Name} " +
                $"with WAL start at {message.WalStart}, " +
                $"WAL end at {message.WalEnd}, " +
                $"and server clock {message.ServerClock} (UTC).");
            replConn.SetReplicationStatus(message.WalEnd);
    
            // The following forces a status update and by that decreases the likeliness that
            // the fact that we have processed the message might get lost by the server but 
            // cannot completely remove that problem. Obviously this call does not come for free.
            // If we don't do this the feedback happens timer-based and is influenced by
            // replConn.WalReceiverStatusInterval
            await replConn.SendStatusUpdate(cancellationToken);
        }
    
        Console.WriteLine(startLsn);
    }
    catch (OperationCanceledException e)
    {
        Console.WriteLine("Shutting down");
        exit.Set();
        return 0;
    }
    catch (Exception e)
    {
        Console.Error.WriteLine($"An exception occured: {e.Message}");
    }
    exit.Set();
    return 1;