Search code examples
scalafuturedeadlines

How to add a deadline to a future in Scala?


Suppose I've got a function fab: A => Future[B] and want it to return a new future that complete before a deadline. So I am writing a new function deadlined like this

def deadlined[B](fut: => Future[B], deadline: Deadline): Future[B] = ???

Now I am using java.util.Timer but could use ScheduledThreadPoolExecutor as suggested. The best solution though is probably a wrapper to abstract out the scheduling implementation and mock it in tests as suggested in comments.

object Deadlined {

  private val timer = new java.util.Timer() // todo: replace it with a wrapper

  def apply[B](fut: => Future[B], deadline: Deadline)(implicit ec: ExecutionContext): Future[B] = {
    val promise = Promise[B]()
    val timerTask = new java.util.TimerTask {
      override def run(): Unit = promise.failure(new Exception(s"$deadline is exceeded"))
    }
    timer.schedule(timerTask, deadline.timeLeft.toMillis)
    fut.transform { result =>
      timerTask.cancel()
      result match {
        case Success(b) => promise.success(b)
        case Failure(t) => promise.failure(t)
      }
      result
    }
    promise.future
  }
}

Does it make sense ? I wonder also how to factor out a common part of Deadlined and Delayed from the response to my previous question.


Solution

  • I'd probably do something similar to the following, so I could add a deadline to any Future (YMMV, Caveat Emptor, etc):

    import scala.concurrent.{Future, ExecutionContext}
    import scala.concurrent.duration.FiniteDuration
    import java.util.{Timer, TimerTask}
    
    implicit class DeadlineFuture[T](future: Future[T]) {
      def deadline(d: FiniteDuration)(implicit timer: Timer): Future[T] = {
        if (future.isCompleted) future
        else {
          val promise = Promise[T]()
          val timerTask = new TimerTask {
            override def run(): Unit = promise.tryFailure(new Exception(s"$d is exceeded"))
          }
          timer.schedule(timerTask, d.toMillis)
          future.onComplete(_ => timerTask.cancel())(ExecutionContext.parasitic)
          promise.completeWith(future).future
        }
      }
    }
    
    // Usage:
    Future.never.deadline(5.seconds).onComplete(println)