I have data arriving at an actor in chunks and would like to return those chunks as a stream in a Play Result
. Since the only way to get a response from Ok.stream
looks like the ideal candidate, something like this:
Action.async { request =>
(source ? GetStream()).map {
case enumerator => Ok.stream(enumerator)
}
}
I'd be returning a Enumerator[Array[Byte]]
from my actor and then inside the actor keep pushing chunks into the enumerator as messages arrive at the actor. However: Returning a mutable Enumerator from an actor definitely seems like a violation of some kind.
Is there a more appropriate way to accomplish this? I figured either akka-stream
or akka.io
would be abstractions that might address the problem space, but I can't see how they would apply.
The solution I have settled on until a better one is suggested is to use the ActorDSL to capture the enumerator in the context of my non-actor caller:
case GET(p"/stream/$streamId") => Action.async { request =>
val (enumerator, channel) = Concurrent.broadcast[Array[Byte]]
actor(new Act {
storage ! Get(streamId)
become {
case DataStart(id, parts, bytes) =>
sender() ! DataAck(id)
become {
case DataPart(_, i, b) =>
channel.push(b.toArray)
case DataEnd(_) =>
channel.eofAndEnd()
}
}
})
Ok.stream(enumerator).as("text/plain")
}
The argument against returning and enumerator from an actor is that it's mutable and non-serializable. But you need an actor to receive a sequence of messages to feed the enumerator. By creating the actor via the DSL, it's explicitly embedded in the calling context, so there is no risk of the enumerator ever leaking across a serialization boundary.