I am trying to get list of persistent events and send it as response(Action.async
). But I am not able to convert PersistenceQuery
results to Future
object. Here is the code
val queries = PersistenceQuery(actorSystem).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
val source: Source[EventEnvelope, NotUsed] = queries.eventsByPersistenceId("MYID", 0, Long.MaxValue)
val mappedSource: Source[JsValue, NotUsed] = source.mapAsync(1) { e =>
e.event match {
case l: String =>
Future(Json.parse(l))
}
}
val finalResult: Future[List[JsValue]] = mappedSource.take(10).runFold(List[JsValue]())((a, b) => {
println(a)
a :+ b
})
finalResult
I am able to see prints within runFold
, but finalResult
was never returned. I even tried to Await
, even after waiting for minutes, it never returned. This finalResult
shows all the user activity wanted to send it as response as Action.async
.
Please let me know what is the way to convert Source
to Future
object
Fix for this problem is preferred to use currentEventsByPersistenceId
, this returns all current events. eventsByPersistenceId
with take
terminates stream. But in my case take
function requires 10 events to be collected, in my case I had less than 10, so stream doesn't get terminated until it accumulates at least 10 events. So result was never getting returned.