Search code examples
akkaakka-persistence

How to retrieve all journal events using Akka persistence queries?


Akka persistence queries have following predefined operations:

EventsByPersistenceId EventsByTag CurrentEventsByPersistenceId CurrentEventsByTag AllPersistenceIds

But what if I need to get all past events, some sort of CurrentEvents operation? I can't figure out how I can implement it in Akka persistence queries terms.


Solution

  • I'm not very familiar with the persistence queries module, but these operations are all defining some akka-streams sources. You could attempt to combine them as follows:

      def currentEvents(fromSequenceNr: Long, toSequenceNr: Long): Source[EventEnvelope, NotUsed] =
        currentPersistenceIds().flatMapConcat(id => currentEventsByPersistenceId(id, fromSequenceNr, toSequenceNr))
    

    (flatMapMerge with concurrency breadth is an alternative to flatMapConcat in case you'd like to parallelise this)