I am experimenting with Akka, and more specifically Akka Persistence, for the first time. I am ultimately trying to implement a small toy program to replicate Akka's use in an event-sourced application. I have had success up until the point where I attempt to use ReadJournal
to project my event stream into my domain.
def main(args: Array[String]): Unit = {
val commands: EmployeeCommandStream = TestEmployeeCommandStream(EmployeeId.generate())
implicit val executionContext = ExecutionContext.global
implicit val system = ActorSystem.create("employee-service-actor-system")
implicit val mat: Materializer = ActorMaterializer()(system)
val service = system.actorOf(Props(classOf[EmployeeActor], commands.employeeId))
commands.stream.foreach(command => service.tell(command, noSender))
lazy val readJournal = PersistenceQuery(system).readJournalFor("inmemory-read-journal")
.asInstanceOf[ReadJournal
with CurrentPersistenceIdsQuery
with CurrentEventsByPersistenceIdQuery
with CurrentEventsByTagQuery
with EventsByPersistenceIdQuery
with EventsByTagQuery]
println(Await.result(
readJournal
.eventsByPersistenceId(commands.employeeId.toString, 0L, Long.MaxValue)
.map(_.event)
.runFold(Employee.apply())({
case (employee: Employee, event: EmployeeEvent) => employee.apply(event)
}),
Duration("10s")
))
}
My domain's only aggregate is the Employee
, so I'm just starting up an actor with the UUID representing some employee, and then I'm issuing some commands for that employee.
In the example above, if I remove the println(Await.result(...))
and replace .runFold(...)
with .runForeach(println)
, the events persisted in my actor are printed as expected for each given command. So I know that the write side of my program and ReadJournal
are both working as expected.
As-is, my program terminates with
Exception in thread "main" java.util.concurrent.TimeoutException: Futures timed out after [10 seconds]
So now my question is, why can't I perform runFold
to ultimately replay my event stream? Is there a better way to do this? Am I simply misusing the API?
Any help would be appreciated, thanks!
Using runFold
, you are folding over a stream. The fold will effectively terminate when the stream itself terminates.
By using eventsByPersistenceId
, you are asking for a never ending stream of live events, thus your fold won't terminate.
You should use currentEventsByPersistenceId
instead for your use case. This variant will stream the events currently available on the journal and terminate.