Search code examples
akka-streamakka.net

Akka.Net PersistenceQuery not returning all results


I am using Akka.Net (v 1.3.2) and am trying to query the event journal for all events with a specific tag. I only want the events that exist at the time I query the journal. Inside an actor, I have the following code:

var readJournal = PersistenceQuery.Get(Context.System).ReadJournalFor<SqlReadJournal>(SqlReadJournal.Identifier);
var stream = readJournal.CurrentEventsByTag("The Tag Name", Offset.NoOffset());
var materializer = ActorMaterializer.Create(Context.System);

stream.RunForeach(envelope =>
{
    // Do some stuff with the EventEnvelope
}, materializer).Wait();

This will successfully query the event journal. However, the problem is it will only return the first 100 events. I need all of them that match the query!

Question: How do I remove the limit/filter that exists when querying the event journal by tag name?

If you need it, here is my akka.persistence configuration:

var config = Akka.Configuration.ConfigurationFactory.ParseString(@"
    akka.persistence {
        journal {
            plugin = ""akka.persistence.journal.sql-server""
            sql-server {
                class = ""Akka.Persistence.SqlServer.Journal.SqlServerJournal, Akka.Persistence.SqlServer""
                connection-string = """ + connectionString + @"""
                schema-name = dbo
                table-name = __akka_EventJournal
                metadata-table-name = __akka_Metadata
                auto-initialize = on
            }
        }

        snapshot-store {
            plugin = ""akka.persistence.snapshot-store.sql-server""
            sql-server {
                class = ""Akka.Persistence.SqlServer.Snapshot.SqlServerSnapshotStore, Akka.Persistence.SqlServer""
                connection-string = """ + connectionString + @"""
                schema-name = dbo
                table-name = __akka_SnapshotStore
                auto-initialize = on
            }
        }
    }"
);

Solution

  • There are two things to check out:

    1. You can set the maximum number of messages returned in one query by setting up akka.persistence.query.journal.sql.max-buffer-size value (see: reference.conf).
    2. Use readJournal.EventsByTag instead of readJournal.CurrentEventsByTag to get a continuous stream of events. Just keep in mind, that it won't complete by itself, but will live on waiting for new events to arrive. You can stop it explicitly i.e. by using KillSwitch.