Search code examples
scalajava.util.concurrent

How can I get scala's blocking to work with tailrec?


I'm writing scala <-> java interop wrappers for Futures and I don't know the Right Way to implement scala.concurrent.Future.onComplete (http://www.scala-lang.org/api/current/index.html#scala.concurrent.Future). This probably works:

def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit = {
  executor.execute(new Runnable {
    @tailrec
    def run = value match {
      case Some(t) => func(t)
      case None => { Thread.sleep(100); run }
    }
  })
}

but Asynchronous IO in Scala with futures suggests that when I have to block I should pass the relevant part of the code to scala.concurrent.blocking to let the ExecutionContext know what's up. The problem is that when I surround the value match{...} with blocking {} it's no longer a tail call.

What's the proverbial right way to do this?

Edit: for completeness here is the entire wrapping class:

class JavaFutureWrapper[T](val jf: java.util.concurrent.Future[T]) extends scala.concurrent.Future[T] {
  def isCompleted = jf.isDone

  def result(atMost: Duration)(implicit permit: CanAwait): T =
    atMost match { case Duration(timeout, units) => jf.get(timeout, units) }

  def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit = {
    executor.execute(new Runnable {
      @tailrec
      def run = value match {
        case Some(t) => func(t)
        case None => { Thread.sleep(100); run }
      }
    })
  }

  def ready(atMost: Duration)(implicit permit: CanAwait): this.type = atMost match {
    case Duration(timeout, units) => {
      jf.get(timeout, units)
      this
    }
  }

  def value: Option[Try[T]] = (jf.isCancelled, jf.isDone) match {
    case (true, _) => Some(Failure(new Exception("Execution was cancelled!")))
    case (_, true) => Some(Success(jf.get))
    case _ => None
  }
}

Solution

  • Hmm, my edit to 0__ 's answer didn't get approved, so for the sake of future readers, here's the solution I'm going with (which is simplified from 0__'s)

    def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit = {
      executor.execute(new Runnable {
        def run = func(Try( blocking { jf.get } ))
      })
    }