Search code examples
scalaplayframeworkstreamingakkaakka-stream

Stream Future in Play 2.5


Once again I am attempting to update some pre Play 2.5 code (based on this vid). For example the following used to be how to stream a Future:

  Ok.chunked(Enumerator.generateM(Promise.timeout(Some("hello"), 500)))

I have created the following method for the work-around for Promise.timeout (deprecated) using Akka:

  private def keepResponding(data: String, delay: FiniteDuration, interval: FiniteDuration): Future[Result] = {
    val promise: Promise[Result] = Promise[Result]()
    actorSystem.scheduler.schedule(delay, interval) { promise.success(Ok(data)) }
    promise.future
  }

According to the Play Framework Migration Guide; Enumerators should be rewritten to a Source and Source.unfoldAsync is apparently the equivalent of Enumerator.generateM so I was hoping that this would work (where str is a Future[String]):

  def inf = Action { request =>
    val str = keepResponding("stream me", 1.second, 2.second)

    Ok.chunked(Source.unfoldAsync(str))
  }

Of course I'm getting a Type mismatch error and when looking at the case class signature of unfoldAsync:

final class UnfoldAsync[S, E](s: S, f: S ⇒ Future[Option[(S, E)]])

I can see that the parameters are not correct but I'm not fully understanding what/how I should pass this through.


Solution

  • unfoldAsync is even more generic than Play!'s own generateM, as it allows you to pass through a status (S) value. This can make the value emitted depend on the previously emitted value(s).

    The example below will load values by an increasing id, until the loading fails:

    val source: Source[String, NotUsed] = Source.unfoldAsync(0){ id ⇒
      loadFromId(id)
        .map(s ⇒ Some((id + 1, s)))
        .recover{case _ ⇒ None}
    }
    
    def loadFromId(id: Int): Future[String] = ???
    

    In your case an internal state is not really needed, therefore you can just pass dummy values whenever required, e.g.

    val source: Source[Result, NotUsed] = Source.unfoldAsync(NotUsed) { _ ⇒
      schedule("stream me", 2.seconds).map(x ⇒ Some(NotUsed → x))
    }
    
    def schedule(data: String, delay: FiniteDuration): Future[Result] = {
      akka.pattern.after(delay, system.scheduler){Future.successful(Ok(data))}
    }
    

    Note that your original implementation of keepResponding is incorrect, as you cannot complete a Promise more than once. Akka after pattern offer a simpler way to achieve what you need.

    However, note that in your specific case, Akka Streams offers a more idiomatic solution with Source.tick:

    val source: Source[String, Cancellable] = Source.tick(1.second, 2.seconds, NotUsed).mapAsync(1){ _ ⇒
      loadSomeFuture()
    }
    
    def loadSomeFuture(): Future[String] = ???
    

    or even simpler in case you don't actually need asynchronous computation as in your example

    val source: Source[String, Cancellable] = Source.tick(1.second, 2.seconds, "stream me")