Once again I am attempting to update some pre Play 2.5 code (based on this vid). For example the following used to be how to stream a Future:
Ok.chunked(Enumerator.generateM(Promise.timeout(Some("hello"), 500)))
I have created the following method for the work-around for Promise.timeout
(deprecated) using Akka:
private def keepResponding(data: String, delay: FiniteDuration, interval: FiniteDuration): Future[Result] = {
val promise: Promise[Result] = Promise[Result]()
actorSystem.scheduler.schedule(delay, interval) { promise.success(Ok(data)) }
promise.future
}
According to the Play Framework Migration Guide; Enumerators
should be rewritten to a Source and Source.unfoldAsync
is apparently the equivalent of Enumerator.generateM
so I was hoping that this would work (where str
is a Future[String]
):
def inf = Action { request =>
val str = keepResponding("stream me", 1.second, 2.second)
Ok.chunked(Source.unfoldAsync(str))
}
Of course I'm getting a Type mismatch error and when looking at the case class signature of unfoldAsync
:
final class UnfoldAsync[S, E](s: S, f: S ⇒ Future[Option[(S, E)]])
I can see that the parameters are not correct but I'm not fully understanding what/how I should pass this through.
unfoldAsync
is even more generic than Play!'s own generateM
, as it allows you to pass through a status (S
) value. This can make the value emitted depend on the previously emitted value(s).
The example below will load values by an increasing id, until the loading fails:
val source: Source[String, NotUsed] = Source.unfoldAsync(0){ id ⇒
loadFromId(id)
.map(s ⇒ Some((id + 1, s)))
.recover{case _ ⇒ None}
}
def loadFromId(id: Int): Future[String] = ???
In your case an internal state is not really needed, therefore you can just pass dummy values whenever required, e.g.
val source: Source[Result, NotUsed] = Source.unfoldAsync(NotUsed) { _ ⇒
schedule("stream me", 2.seconds).map(x ⇒ Some(NotUsed → x))
}
def schedule(data: String, delay: FiniteDuration): Future[Result] = {
akka.pattern.after(delay, system.scheduler){Future.successful(Ok(data))}
}
Note that your original implementation of keepResponding
is incorrect, as you cannot complete a Promise
more than once. Akka after
pattern offer a simpler way to achieve what you need.
However, note that in your specific case, Akka Streams offers a more idiomatic solution with Source.tick
:
val source: Source[String, Cancellable] = Source.tick(1.second, 2.seconds, NotUsed).mapAsync(1){ _ ⇒
loadSomeFuture()
}
def loadSomeFuture(): Future[String] = ???
or even simpler in case you don't actually need asynchronous computation as in your example
val source: Source[String, Cancellable] = Source.tick(1.second, 2.seconds, "stream me")