Search code examples
scalaasynchronousakkafuturepolling

How to poll with a Future in Scala?


I want to poll an API endpoint until it reaches some condition. I expect it to reach this condition in couple of seconds to a minute. I have a method to call the endpoint that returns a Future. Is there some way I can chain Futures together to poll this endpoint every n milliseconds and give up after t tries?

Assume I have a function with the following signature:

def isComplete(): Future[Boolean] = ???

The simplest way to do this in my opinion would be to make everything blocking:

def untilComplete(): Unit = {
  for { _ <- 0 to 10 } {
    val status = Await.result(isComplete(), 1.seconds)
    if (status) return Unit
    Thread.sleep(100)
  }
  throw new Error("Max attempts")
}

But this may occupy all the threads and it is not asynchronous. I also considered doing it recursively:

def untilComplete(
    f: Future[Boolean] = Future.successful(false),
    attempts: Int = 10
  ): Future[Unit] = f flatMap { status =>
    if (status) Future.successful(Unit)
    else if (attempts == 0) throw new Error("Max attempts")
    else {
      Thread.sleep(100)
      untilComplete(isComplete(), attempts - 1)
    }
}

However, I am concerned about maxing out the call stack since this is not tail recursive.

Is there a better way of doing this?

Edit: I am using akka


Solution

  • You could use Akka Streams. For example, to call isComplete every 500 milliseconds until the result of the Future is true, up to a maximum of five times:

    import akka.actor.ActorSystem
    import akka.stream.ActorMaterializer
    import akka.stream.scaladsl.{ Sink, Source }
    import scala.concurrent.Future
    import scala.concurrent.duration._
    
    def isComplete(): Future[Boolean] = ???
    
    implicit val system = ActorSystem("MyExample")
    implicit val materializer = ActorMaterializer()
    implicit val ec = system.dispatcher
    
    val stream: Future[Option[Boolean]] =
      Source(1 to 5)
        .throttle(1, 500 millis)
        .mapAsync(parallelism = 1)(_ => isComplete())
        .takeWhile(_ == false, true)
        .runWith(Sink.lastOption)
    
    stream onComplete { result =>
      println(s"Stream completed with result: $result")
      system.terminate()
    }