Search code examples
playframeworkakkacqrsakka-streamakka-persistence

Play framework, Akka Persistence, PersistenceQuery, unable to convert Source to Future Object for Action.async


I am trying to get list of persistent events and send it as response(Action.async). But I am not able to convert PersistenceQuery results to Future object. Here is the code

val queries = PersistenceQuery(actorSystem).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
val source: Source[EventEnvelope, NotUsed] = queries.eventsByPersistenceId("MYID", 0, Long.MaxValue)
val mappedSource: Source[JsValue, NotUsed] = source.mapAsync(1) { e =>
      e.event match {
        case l: String =>
           Future(Json.parse(l))
      }
   }
val finalResult: Future[List[JsValue]] = mappedSource.take(10).runFold(List[JsValue]())((a, b) => {
      println(a)
      a :+ b
    })
finalResult

I am able to see prints within runFold, but finalResult was never returned. I even tried to Await, even after waiting for minutes, it never returned. This finalResult shows all the user activity wanted to send it as response as Action.async. Please let me know what is the way to convert Source to Future object


Solution

  • Fix for this problem is preferred to use currentEventsByPersistenceId, this returns all current events. eventsByPersistenceId with take terminates stream. But in my case take function requires 10 events to be collected, in my case I had less than 10, so stream doesn't get terminated until it accumulates at least 10 events. So result was never getting returned.