Akka persistence queries have following predefined operations:
EventsByPersistenceId EventsByTag CurrentEventsByPersistenceId CurrentEventsByTag AllPersistenceIds
But what if I need to get all past events, some sort of CurrentEvents operation? I can't figure out how I can implement it in Akka persistence queries terms.
I'm not very familiar with the persistence queries module, but these operations are all defining some akka-streams sources. You could attempt to combine them as follows:
def currentEvents(fromSequenceNr: Long, toSequenceNr: Long): Source[EventEnvelope, NotUsed] =
currentPersistenceIds().flatMapConcat(id => currentEventsByPersistenceId(id, fromSequenceNr, toSequenceNr))
(flatMapMerge
with concurrency breadth is an alternative to flatMapConcat
in case you'd like to parallelise this)